Skip to content

Commit 08e3941

Browse files
committed
squashme: patch session affinity properly
1 parent 1b5baaf commit 08e3941

File tree

4 files changed

+121
-33
lines changed

4 files changed

+121
-33
lines changed

components/renku_data_services/notebooks/api/amalthea_patches/general.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -59,14 +59,17 @@ def session_affinity(server: UserServer) -> list[dict[str, Any]]:
5959
],
6060
}
6161
]
62-
default_preferred_selector_terms: list[dict[str, Any]] = server.config.sessions.affinity.get(
63-
"nodeAffinity", {}
64-
).get("preferredDuringSchedulingIgnoredDuringExecution", [])
65-
default_required_selector_terms: list[dict[str, Any]] = (
66-
server.config.sessions.affinity.get("nodeAffinity", {})
67-
.get("requiredDuringSchedulingIgnoredDuringExecution", {})
68-
.get("nodeSelectorTerms", [])
69-
)
62+
default_preferred_selector_terms: list[dict[str, Any]] = []
63+
default_required_selector_terms: list[dict[str, Any]] = []
64+
if server.config.sessions.affinity:
65+
default_preferred_selector_terms = server.config.sessions.affinity.get("nodeAffinity", {}).get(
66+
"preferredDuringSchedulingIgnoredDuringExecution", []
67+
)
68+
default_required_selector_terms = (
69+
server.config.sessions.affinity.get("nodeAffinity", {})
70+
.get("requiredDuringSchedulingIgnoredDuringExecution", {})
71+
.get("nodeSelectorTerms", [])
72+
)
7073
preferred_match_expressions: list[dict[str, str]] = []
7174
required_match_expressions: list[dict[str, str]] = []
7275
for affinity in server.server_options.node_affinities:

components/renku_data_services/notebooks/config/dynamic.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,7 @@ class _SessionConfig:
395395
termination_warning_duration_seconds: int = 12 * 60 * 60
396396
image_default_workdir: str = "/home/jovyan"
397397
node_selector: dict[str, str] = field(default_factory=dict)
398-
affinity: dict[str, Any] = field(default_factory=dict)
398+
affinity: dict[str, Any] | None = None
399399
tolerations: list[dict[str, str]] = field(default_factory=list)
400400
init_containers: list[str] = field(
401401
default_factory=lambda: [
@@ -407,6 +407,11 @@ class _SessionConfig:
407407

408408
@classmethod
409409
def from_env(cls) -> Self:
410+
affinity = (
411+
yaml.safe_load(StringIO(os.environ["NB_SESSIONS__AFFINITY"]))
412+
if os.environ.get("NB_SESSIONS__AFFINITY") is not None
413+
else None
414+
)
410415
return cls(
411416
culling=_SessionCullingConfig.from_env(),
412417
git_proxy=_GitProxyConfig.from_env(),
@@ -423,7 +428,7 @@ def from_env(cls) -> Self:
423428
termination_warning_duration_seconds=_parse_value_as_int(os.environ.get("", 12 * 60 * 60)),
424429
image_default_workdir="/home/jovyan",
425430
node_selector=yaml.safe_load(StringIO(os.environ.get("NB_SESSIONS__NODE_SELECTOR", "{}"))),
426-
affinity=yaml.safe_load(StringIO(os.environ.get("NB_SESSIONS__AFFINITY", "{}"))),
431+
affinity=affinity,
427432
tolerations=yaml.safe_load(StringIO(os.environ.get("NB_SESSIONS__TOLERATIONS", "[]"))),
428433
)
429434

@@ -456,8 +461,8 @@ def _for_testing(cls) -> Self:
456461
)
457462

458463
@property
459-
def affinity_model(self) -> Affinity:
460-
return Affinity.model_validate(self.affinity)
464+
def affinity_model(self) -> Affinity | None:
465+
return Affinity.model_validate(self.affinity) if self.affinity is not None else None
461466

462467
@property
463468
def tolerations_model(self) -> list[Toleration]:

components/renku_data_services/notebooks/crs.py

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from __future__ import annotations
44

55
import re
6-
from collections.abc import Mapping
6+
from collections.abc import Mapping, Sequence
77
from datetime import datetime, timedelta
88
from typing import Any, cast, override
99
from urllib.parse import urlunparse
@@ -41,6 +41,7 @@
4141
ReconcileStrategy,
4242
RemoteSecretRef,
4343
RequiredDuringSchedulingIgnoredDuringExecution,
44+
RequiredDuringSchedulingIgnoredDuringExecutionItem,
4445
Size,
4546
State,
4647
Status,
@@ -57,8 +58,17 @@
5758
from renku_data_services.notebooks.cr_amalthea_session import Limits7 as LimitsStr
5859
from renku_data_services.notebooks.cr_amalthea_session import Location as SessionLocation
5960
from renku_data_services.notebooks.cr_amalthea_session import Model as _ASModel
61+
from renku_data_services.notebooks.cr_amalthea_session import (
62+
PreferredDuringSchedulingIgnoredDuringExecutionItem1 as PreferredPodAffinityItem,
63+
)
64+
from renku_data_services.notebooks.cr_amalthea_session import (
65+
PreferredDuringSchedulingIgnoredDuringExecutionItem2 as PreferredPodAntiAffinityItem,
66+
)
6067
from renku_data_services.notebooks.cr_amalthea_session import Requests6 as _Requests
6168
from renku_data_services.notebooks.cr_amalthea_session import Requests7 as RequestsStr
69+
from renku_data_services.notebooks.cr_amalthea_session import (
70+
RequiredDuringSchedulingIgnoredDuringExecutionItem1 as RequiredPodAntiAffinityItem,
71+
)
6272
from renku_data_services.notebooks.cr_amalthea_session import Resources3 as _Resources
6373
from renku_data_services.notebooks.cr_amalthea_session import Secret1 as SecretAsVolume
6474
from renku_data_services.notebooks.cr_amalthea_session import SecretRef as _SecretRef
@@ -407,12 +417,39 @@ class AmaltheaSessionV1Alpha1MetadataPatch(BaseCRD):
407417
annotations: dict[str, str | ResetType] | ResetType | None = None
408418

409419

420+
class NodeAffinityPatch(BaseCRD):
421+
"""Patch for the node affinity of a session."""
422+
423+
preferredDuringSchedulingIgnoredDuringExecution: (
424+
Sequence[PreferredDuringSchedulingIgnoredDuringExecutionItem] | None | ResetType
425+
) = None
426+
requiredDuringSchedulingIgnoredDuringExecution: (
427+
RequiredDuringSchedulingIgnoredDuringExecution | None | ResetType
428+
) = None
429+
430+
431+
class PodAffinityPatch(BaseCRD):
432+
"""Patch for the pod affinity of a session."""
433+
434+
preferredDuringSchedulingIgnoredDuringExecution: Sequence[PreferredPodAffinityItem] | None | ResetType = None
435+
requiredDuringSchedulingIgnoredDuringExecution: (
436+
Sequence[RequiredDuringSchedulingIgnoredDuringExecutionItem] | None | ResetType
437+
) = None
438+
439+
440+
class PodAntiAffinityPatch(BaseCRD):
441+
"""Patch for the pod anti affinity of a session."""
442+
443+
preferredDuringSchedulingIgnoredDuringExecution: Sequence[PreferredPodAntiAffinityItem] | None | ResetType = None
444+
requiredDuringSchedulingIgnoredDuringExecution: Sequence[RequiredPodAntiAffinityItem] | None | ResetType = None
445+
446+
410447
class AffinityPatch(BaseCRD):
411448
"""Patch for the affinity of a session."""
412449

413-
nodeAffinity: NodeAffinity | ResetType | None = None
414-
podAffinity: PodAffinity | ResetType | None = None
415-
podAntiAffinity: PodAntiAffinity | ResetType | None = None
450+
nodeAffinity: NodeAffinityPatch | ResetType | None = None
451+
podAffinity: PodAffinityPatch | ResetType | None = None
452+
podAntiAffinity: PodAntiAffinityPatch | ResetType | None = None
416453

417454

418455
class CullingPatch(CullingDurationParsingMixin):

components/renku_data_services/notebooks/utils.py

Lines changed: 60 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
11
"""Utilities for notebooks."""
22

33
import renku_data_services.crc.models as crc_models
4-
from renku_data_services.base_models.core import RESET
4+
from renku_data_services.base_models.core import RESET, ResetType
5+
from renku_data_services.errors import errors
56
from renku_data_services.notebooks.crs import (
67
Affinity,
78
AffinityPatch,
89
MatchExpression,
910
NodeAffinity,
11+
NodeAffinityPatch,
1012
NodeSelectorTerm,
13+
PodAffinityPatch,
14+
PodAntiAffinityPatch,
1115
Preference,
1216
PreferredDuringSchedulingIgnoredDuringExecutionItem,
1317
RequiredDuringSchedulingIgnoredDuringExecution,
@@ -85,10 +89,10 @@ def intersect_node_affinities(
8589

8690
def node_affinity_from_resource_class(
8791
resource_class: crc_models.ResourceClass,
88-
default_affinity: Affinity,
89-
) -> Affinity:
92+
default_affinity: Affinity | None,
93+
) -> Affinity | None:
9094
"""Generate an affinity from the affinities stored in a resource class."""
91-
rc_node_affinity = NodeAffinity()
95+
rc_node_affinity: NodeAffinity | None = None
9296
required_expr = [
9397
MatchExpression(key=affinity.key, operator="Exists")
9498
for affinity in resource_class.node_affinities
@@ -100,6 +104,7 @@ def node_affinity_from_resource_class(
100104
if not affinity.required_during_scheduling
101105
]
102106
if required_expr:
107+
rc_node_affinity = NodeAffinity()
103108
rc_node_affinity.requiredDuringSchedulingIgnoredDuringExecution = (
104109
RequiredDuringSchedulingIgnoredDuringExecution(
105110
nodeSelectorTerms=[
@@ -112,6 +117,8 @@ def node_affinity_from_resource_class(
112117
)
113118
)
114119
if preferred_expr:
120+
if not rc_node_affinity:
121+
rc_node_affinity = NodeAffinity()
115122
rc_node_affinity.preferredDuringSchedulingIgnoredDuringExecution = [
116123
PreferredDuringSchedulingIgnoredDuringExecutionItem(
117124
weight=1,
@@ -122,24 +129,60 @@ def node_affinity_from_resource_class(
122129
)
123130
]
124131

125-
affinity = default_affinity.model_copy(deep=True)
126-
if affinity.nodeAffinity:
127-
affinity.nodeAffinity = intersect_node_affinities(affinity.nodeAffinity, rc_node_affinity)
128-
else:
129-
affinity.nodeAffinity = rc_node_affinity
130-
return affinity
132+
match (default_affinity, rc_node_affinity):
133+
case (None, None):
134+
return None
135+
case (None, NodeAffinity()):
136+
return Affinity(nodeAffinity=rc_node_affinity)
137+
case (Affinity(), None):
138+
return default_affinity
139+
case (Affinity(), NodeAffinity()):
140+
affinity = default_affinity.model_copy(deep=True)
141+
if affinity.nodeAffinity:
142+
affinity.nodeAffinity = intersect_node_affinities(affinity.nodeAffinity, rc_node_affinity)
143+
else:
144+
affinity.nodeAffinity = rc_node_affinity
145+
return affinity
146+
case _:
147+
raise errors.ProgrammingError(message="Cannot derive node affinity from resource class and defaults.")
131148

132149

133150
def node_affinity_patch_from_resource_class(
134-
resource_class: crc_models.ResourceClass, default_affinity: Affinity
135-
) -> AffinityPatch:
151+
resource_class: crc_models.ResourceClass, default_affinity: Affinity | None
152+
) -> AffinityPatch | ResetType:
136153
"""Create a patch for the session affinity."""
137154
affinity = node_affinity_from_resource_class(resource_class, default_affinity)
138-
return AffinityPatch(
139-
nodeAffinity=affinity.nodeAffinity or RESET,
140-
podAffinity=affinity.podAffinity or RESET,
141-
podAntiAffinity=affinity.podAntiAffinity or RESET,
142-
)
155+
if not affinity:
156+
return RESET
157+
patch = AffinityPatch(nodeAffinity=RESET, podAffinity=RESET, podAntiAffinity=RESET)
158+
if affinity.nodeAffinity:
159+
patch.nodeAffinity = NodeAffinityPatch(
160+
preferredDuringSchedulingIgnoredDuringExecution=(
161+
affinity.nodeAffinity.preferredDuringSchedulingIgnoredDuringExecution or RESET
162+
),
163+
requiredDuringSchedulingIgnoredDuringExecution=(
164+
affinity.nodeAffinity.requiredDuringSchedulingIgnoredDuringExecution or RESET
165+
),
166+
)
167+
if affinity.podAffinity:
168+
patch.podAffinity = PodAffinityPatch(
169+
preferredDuringSchedulingIgnoredDuringExecution=(
170+
affinity.podAffinity.preferredDuringSchedulingIgnoredDuringExecution or RESET
171+
),
172+
requiredDuringSchedulingIgnoredDuringExecution=(
173+
affinity.podAffinity.requiredDuringSchedulingIgnoredDuringExecution or RESET
174+
),
175+
)
176+
if affinity.podAntiAffinity:
177+
patch.podAntiAffinity = PodAntiAffinityPatch(
178+
preferredDuringSchedulingIgnoredDuringExecution=(
179+
affinity.podAntiAffinity.preferredDuringSchedulingIgnoredDuringExecution or RESET
180+
),
181+
requiredDuringSchedulingIgnoredDuringExecution=(
182+
affinity.podAntiAffinity.requiredDuringSchedulingIgnoredDuringExecution or RESET
183+
),
184+
)
185+
return patch
143186

144187

145188
def tolerations_from_resource_class(

0 commit comments

Comments
 (0)