Skip to content

Commit

Permalink
Backport fixes in #11075
Browse files Browse the repository at this point in the history
Introduced back the functions to convert k8s size values to float, but
moved to kfp.dsl.utils

Signed-off-by: Ricardo M. Oliveira <rmartine@redhat.com>
  • Loading branch information
rimolive committed Nov 25, 2024
1 parent 467f30c commit 8ea2606
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 4 deletions.
1 change: 1 addition & 0 deletions sdk/RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
## Deprecations

## Bug fixes and other changes
* Backport fixes in kubeflow/pipelines#11075. [\#11392])(https://github.com/kubeflow/pipelines/pull/11392)

## Documentation updates

Expand Down
18 changes: 18 additions & 0 deletions sdk/python/kfp/compiler/compiler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3437,29 +3437,47 @@ def simple_pipeline():
self.assertEqual(
'5', dict_format['deploymentSpec']['executors']['exec-return-1-2']
['container']['resources']['resourceCpuLimit'])
self.assertEqual(
5.0, dict_format['deploymentSpec']['executors']['exec-return-1-2']
['container']['resources']['cpuLimit'])
self.assertNotIn(
'memoryLimit', dict_format['deploymentSpec']['executors']
['exec-return-1-2']['container']['resources'])

self.assertEqual(
'50G', dict_format['deploymentSpec']['executors']['exec-return-1-3']
['container']['resources']['resourceMemoryLimit'])
self.assertEqual(
50.0, dict_format['deploymentSpec']['executors']['exec-return-1-3']
['container']['resources']['memoryLimit'])
self.assertNotIn(
'cpuLimit', dict_format['deploymentSpec']['executors']
['exec-return-1-3']['container']['resources'])

self.assertEqual(
'2', dict_format['deploymentSpec']['executors']['exec-return-1-4']
['container']['resources']['resourceCpuRequest'])
self.assertEqual(
2.0, dict_format['deploymentSpec']['executors']['exec-return-1-4']
['container']['resources']['cpuRequest'])
self.assertEqual(
'5', dict_format['deploymentSpec']['executors']['exec-return-1-4']
['container']['resources']['resourceCpuLimit'])
self.assertEqual(
5.0, dict_format['deploymentSpec']['executors']['exec-return-1-4']
['container']['resources']['cpuLimit'])
self.assertEqual(
'4G', dict_format['deploymentSpec']['executors']['exec-return-1-4']
['container']['resources']['resourceMemoryRequest'])
self.assertEqual(
4.0, dict_format['deploymentSpec']['executors']['exec-return-1-4']
['container']['resources']['memoryRequest'])
self.assertEqual(
'50G', dict_format['deploymentSpec']['executors']['exec-return-1-4']
['container']['resources']['resourceMemoryLimit'])
self.assertEqual(
50.0, dict_format['deploymentSpec']['executors']['exec-return-1-4']
['container']['resources']['memoryLimit'])


class TestPlatformConfig(unittest.TestCase):
Expand Down
78 changes: 78 additions & 0 deletions sdk/python/kfp/compiler/compiler_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@

import collections
import copy
import re
from typing import DefaultDict, Dict, List, Mapping, Set, Tuple, Union

from kfp import dsl
from kfp.dsl import constants
from kfp.dsl import for_loop
from kfp.dsl import pipeline_channel
from kfp.dsl import pipeline_context
Expand Down Expand Up @@ -803,3 +805,79 @@ def recursive_replace_placeholders(data: Union[Dict, List], old_value: str,
if isinstance(data, pipeline_channel.PipelineChannel):
data = str(data)
return new_value if data == old_value else data


def validate_cpu_request_limit_to_float(cpu: str) -> float:
"""Validates cpu request/limit string and converts to its numeric float
value.
Args:
cpu: CPU requests or limits. This string should be a number or a
number followed by an "m" to indicate millicores (1/1000). For
more information, see `Specify a CPU Request and a CPU Limit
<https://kubernetes.io/docs/tasks/configure-pod-container/assign-cpu-resource/#specify-a-cpu-request-and-a-cpu-limit>`_.
Raises:
ValueError if the cpu request/limit string value is invalid.
Returns:
The numeric value (float) of the cpu request/limit.
"""
if re.match(r'([0-9]*[.])?[0-9]+m?$', cpu) is None:
raise ValueError(
'Invalid cpu string. Should be float or integer, or integer'
' followed by "m".')

return float(cpu[:-1]) / 1000 if cpu.endswith('m') else float(cpu)


def validate_memory_request_limit_to_float(memory: str) -> float:
"""Validates memory request/limit string and converts to its numeric value.
Args:
memory: Memory requests or limits. This string should be a number or
a number followed by one of "E", "Ei", "P", "Pi", "T", "Ti", "G",
"Gi", "M", "Mi", "K", or "Ki".
Raises:
ValueError if the memory request/limit string value is invalid.
Returns:
The numeric value (float) of the memory request/limit.
"""
if re.match(r'^[0-9]+(E|Ei|P|Pi|T|Ti|G|Gi|M|Mi|K|Ki){0,1}$',
memory) is None:
raise ValueError(
'Invalid memory string. Should be a number or a number '
'followed by one of "E", "Ei", "P", "Pi", "T", "Ti", "G", '
'"Gi", "M", "Mi", "K", "Ki".')

if memory.endswith('E'):
memory = float(memory[:-1]) * constants._E / constants._G
elif memory.endswith('Ei'):
memory = float(memory[:-2]) * constants._EI / constants._G
elif memory.endswith('P'):
memory = float(memory[:-1]) * constants._P / constants._G
elif memory.endswith('Pi'):
memory = float(memory[:-2]) * constants._PI / constants._G
elif memory.endswith('T'):
memory = float(memory[:-1]) * constants._T / constants._G
elif memory.endswith('Ti'):
memory = float(memory[:-2]) * constants._TI / constants._G
elif memory.endswith('G'):
memory = float(memory[:-1])
elif memory.endswith('Gi'):
memory = float(memory[:-2]) * constants._GI / constants._G
elif memory.endswith('M'):
memory = float(memory[:-1]) * constants._M / constants._G
elif memory.endswith('Mi'):
memory = float(memory[:-2]) * constants._MI / constants._G
elif memory.endswith('K'):
memory = float(memory[:-1]) * constants._K / constants._G
elif memory.endswith('Ki'):
memory = float(memory[:-2]) * constants._KI / constants._G
else:
# By default interpret as a plain integer, in the unit of Bytes.
memory = float(memory) / constants._G

return memory
12 changes: 12 additions & 0 deletions sdk/python/kfp/compiler/pipeline_spec_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -648,17 +648,29 @@ def convert_to_placeholder(input_value: str) -> str:
container_spec.resources.resource_cpu_request = (
convert_to_placeholder(
task.container_spec.resources.cpu_request))
container_spec.resources.cpu_request = compiler_utils.validate_cpu_request_limit_to_float(
cpu=convert_to_placeholder(
task.container_spec.resources.cpu_request))
if task.container_spec.resources.cpu_limit is not None:
container_spec.resources.resource_cpu_limit = (
convert_to_placeholder(task.container_spec.resources.cpu_limit))
container_spec.resources.cpu_limit = compiler_utils.validate_cpu_request_limit_to_float(
cpu=convert_to_placeholder(
task.container_spec.resources.cpu_limit))
if task.container_spec.resources.memory_request is not None:
container_spec.resources.resource_memory_request = (
convert_to_placeholder(
task.container_spec.resources.memory_request))
container_spec.resources.memory_request = compiler_utils.validate_memory_request_limit_to_float(
memory=convert_to_placeholder(
task.container_spec.resources.memory_request))
if task.container_spec.resources.memory_limit is not None:
container_spec.resources.resource_memory_limit = (
convert_to_placeholder(
task.container_spec.resources.memory_limit))
container_spec.resources.memory_limit = compiler_utils.validate_memory_request_limit_to_float(
memory=convert_to_placeholder(
task.container_spec.resources.memory_limit))
if task.container_spec.resources.accelerator_count is not None:
container_spec.resources.accelerator.CopyFrom(
pipeline_spec_pb2.PipelineDeploymentConfig.PipelineContainerSpec
Expand Down
12 changes: 8 additions & 4 deletions sdk/python/test_data/pipelines/pipeline_with_resource_spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,17 @@ deploymentSpec:
resources:
accelerator:
count: '1'
type: tpu-v3
type: 'tpu-v3'
resourceCount: '1'
resourceType: tpu-v3
resourceType: 'tpu-v3'
cpuLimit: 4.0
cpuRequest: 2.0
memoryLimit: 15.032385536
memoryRequest: 4.294967296
resourceCpuLimit: '4'
resourceCpuRequest: '2'
resourceMemoryLimit: 14Gi
resourceMemoryRequest: 4Gi
resourceMemoryLimit: '14Gi'
resourceMemoryRequest: '4Gi'
pipelineInfo:
description: A linear two-step pipeline with resource specification.
name: two-step-pipeline-with-resource-spec
Expand Down

0 comments on commit 8ea2606

Please sign in to comment.