Skip to content

Commit

Permalink
check that requested cpu/memory less than or equals according limits
Browse files Browse the repository at this point in the history
Signed-off-by: ntny <ntny1986@gmail.com>
  • Loading branch information
ntny committed Oct 14, 2024
1 parent 880e46d commit 15aef12
Show file tree
Hide file tree
Showing 2 changed files with 257 additions and 0 deletions.
56 changes: 56 additions & 0 deletions sdk/python/kfp/dsl/pipeline_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,16 @@ def set_caching_options(self, enable_caching: bool) -> 'PipelineTask':
self._task_spec.enable_caching = enable_caching
return self

def _ensure_resource_requests_meet_limits(self) -> None:
resources = self.container_spec.resources
if (resources.memory_request is not None and
resources.memory_limit is not None and
self._parse_memory_str_request(resources.memory_request) >
self._parse_memory_str_request(resources.memory_limit)):
raise ValueError(
f'Requested memory: {resources.memory_request} cannot be greater than memory limit: {resources.memory_limit}. '
'Check the set_memory_request and set_memory_limit parameters.')

def _ensure_container_spec_exists(self) -> None:
"""Ensures that the task has a container spec."""
caller_method_name = inspect.stack()[1][3]
Expand Down Expand Up @@ -452,6 +462,48 @@ def set_gpu_limit(self, gpu: str) -> 'PipelineTask':
category=DeprecationWarning)
return self.set_accelerator_limit(gpu)

def _parse_memory_str_request(self, memory_str: str) -> float:
memory_request = float(0)
if memory_str.endswith('E'):
memory_request = float(
memory_str[:-1]) * constants._E / constants._G
elif memory_str.endswith('Ei'):
memory_request = float(
memory_str[:-2]) * constants._EI / constants._G
elif memory_str.endswith('P'):
memory_request = float(
memory_str[:-1]) * constants._P / constants._G
elif memory_str.endswith('Pi'):
memory_request = float(
memory_str[:-2]) * constants._PI / constants._G
elif memory_str.endswith('T'):
memory_request = float(
memory_str[:-1]) * constants._T / constants._G
elif memory_str.endswith('Ti'):
memory_request = float(
memory_str[:-2]) * constants._TI / constants._G
elif memory_str.endswith('G'):
memory_request = float(memory_str[:-1])
elif memory_str.endswith('Gi'):
memory_request = float(
memory_str[:-2]) * constants._GI / constants._G
elif memory_str.endswith('M'):
memory_request = float(
memory_str[:-1]) * constants._M / constants._G
elif memory_str.endswith('Mi'):
memory_request = float(
memory_str[:-2]) * constants._MI / constants._G
elif memory_str.endswith('K'):
memory_request = float(
memory_str[:-1]) * constants._K / constants._G
elif memory_str.endswith('Ki'):
memory_request = float(
memory_str[:-2]) * constants._KI / constants._G
else:
# By default interpret as a plain integer, in the unit of Bytes.
memory_request = float(memory_str) / constants._G
return memory_request

def _validate_memory_request_limit(self, memory: str) -> str:
"""Validates memory request/limit string and converts to its numeric
string value.
Expand Down Expand Up @@ -503,6 +555,8 @@ def set_memory_request(
self.container_spec.resources = structures.ResourceSpec(
memory_request=memory)

self._ensure_resource_requests_meet_limits()

return self

@block_if_final()
Expand Down Expand Up @@ -530,6 +584,8 @@ def set_memory_limit(
self.container_spec.resources = structures.ResourceSpec(
memory_limit=memory)

self._ensure_resource_requests_meet_limits()

return self

@block_if_final()
Expand Down
201 changes: 201 additions & 0 deletions sdk/python/kfp/dsl/pipeline_task_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,207 @@ def test_set_accelerator_limit(self, limit, expected_limit):
self.assertEqual(expected_limit,
task.container_spec.resources.accelerator_count)

@parameterized.parameters(
{
'memory': '2E',
'limit': '1E',
},
{
'memory': '3Ei',
'limit': '2Ei',
},
{
'memory': '20P',
'limit': '2P',
},
{
'memory': '2P',
'limit': '1999T',
},
{
'memory': '3P',
'limit': '2000T',
},
{
'memory': '25Pi',
'limit': '24Pi',
},
{
'memory': '1Pi',
'limit': '1023Ti',
},
{
'memory': '14T',
'limit': '4T',
},
{
'memory': '4T',
'limit': '3999G',
},
{
'memory': '1P',
'limit': '999T',
},
{
'memory': '1Ti',
'limit': '999Gi',
},
{
'memory': '14G',
'limit': '9G',
},
{
'memory': '1G',
'limit': '999999K',
},
{
'memory': '1Gi',
'limit': '1000M',
},
{
'memory': '10Gi',
'limit': '9Gi',
},
{
'memory': '15M',
'limit': '5M',
},
{
'memory': '5Mi',
'limit': '5M',
},
{
'memory': '95Mi',
'limit': '94Mi',
},
{
'memory': '6K',
'limit': '5K',
},
{
'memory': '100Ki',
'limit': '65Ki',
},
{
'memory': '1Mi',
'limit': '10Ki',
},
{
'memory': '7001',
'limit': '7000',
},
)
def test_set_memory_request_greater_than_limit_should_raise(
self, memory: str, limit: str):
task = pipeline_task.PipelineTask(
component_spec=structures.ComponentSpec.from_yaml_documents(
V2_YAML),
args={'input1': 'value'},
)
with self.assertRaisesRegex(
ValueError,
f'Requested memory: {memory} cannot be greater than memory limit: {limit}. '
'Check the set_memory_request and set_memory_limit parameters.'
):
task.set_memory_request(memory).set_memory_limit(limit)

@parameterized.parameters(
{
'memory': '1E',
'limit': '2E',
},
{
'memory': '55Ei',
'limit': '150Ei',
},
{
'memory': '2P',
'limit': '20P',
},
{
'memory': '3P',
'limit': '3000T',
},
{
'memory': '25Pi',
'limit': '25Pi',
},
{
'memory': '1Pi',
'limit': '1024Ti',
},
{
'memory': '4T',
'limit': '14T',
},
{
'memory': '4T',
'limit': '4000G',
},
{
'memory': '4T',
'limit': '1P',
},
{
'memory': '1Ti',
'limit': '1024Gi',
},
{
'memory': '4G',
'limit': '14G',
},
{
'memory': '1G',
'limit': '1000M',
},
{
'memory': '1Gi',
'limit': '1024Mi',
},
{
'memory': '45Gi',
'limit': '100Gi',
},
{
'memory': '5M',
'limit': '15M',
},
{
'memory': '5M',
'limit': '5Mi',
},
{
'memory': '95Mi',
'limit': '95Mi',
},
{
'memory': '6K',
'limit': '7K',
},
{
'memory': '65Ki',
'limit': '100Ki',
},
{
'memory': '10Ki',
'limit': '1Mi',
},
{
'memory': '7000',
'limit': '7001',
},
)
def test_set_memory_request_and_limit(self, memory: str, limit: str):
task = pipeline_task.PipelineTask(
component_spec=structures.ComponentSpec.from_yaml_documents(
V2_YAML),
args={'input1': 'value'},
)
task.set_memory_request(memory)
self.assertEqual(memory, task.container_spec.resources.memory_request)
task.set_memory_limit(limit)
self.assertEqual(limit, task.container_spec.resources.memory_limit)

@parameterized.parameters(
{
'memory': '1E',
Expand Down

0 comments on commit 15aef12

Please sign in to comment.