diff --git a/packages/aws-library/src/aws_library/ec2/client.py b/packages/aws-library/src/aws_library/ec2/client.py index f4b5436bee9..efdd22c3642 100644 --- a/packages/aws-library/src/aws_library/ec2/client.py +++ b/packages/aws-library/src/aws_library/ec2/client.py @@ -90,9 +90,13 @@ async def get_ec2_instance_capabilities( list_instances.append( EC2InstanceType( name=instance["InstanceType"], - cpus=instance["VCpuInfo"]["DefaultVCpus"], - ram=ByteSize( - int(instance["MemoryInfo"]["SizeInMiB"]) * 1024 * 1024 + resources=Resources( + cpus=instance["VCpuInfo"]["DefaultVCpus"], + ram=ByteSize( + int(instance["MemoryInfo"]["SizeInMiB"]) + * 1024 + * 1024 + ), ), ) ) @@ -173,9 +177,7 @@ async def start_aws_instance( tags=parse_obj_as( EC2Tags, {tag["Key"]: tag["Value"] for tag in instance["Tags"]} ), - resources=Resources( - cpus=instance_config.type.cpus, ram=instance_config.type.ram - ), + resources=instance_config.type.resources, ) for instance in instances["Reservations"][0]["Instances"] ] @@ -234,10 +236,7 @@ async def get_instances( else None, type=instance["InstanceType"], state=instance["State"]["Name"], - resources=Resources( - cpus=ec2_instance_types[0].cpus, - ram=ec2_instance_types[0].ram, - ), + resources=ec2_instance_types[0].resources, tags=parse_obj_as( EC2Tags, {tag["Key"]: tag["Value"] for tag in instance["Tags"]}, diff --git a/packages/aws-library/src/aws_library/ec2/models.py b/packages/aws-library/src/aws_library/ec2/models.py index 9b67359c915..79c73e89fae 100644 --- a/packages/aws-library/src/aws_library/ec2/models.py +++ b/packages/aws-library/src/aws_library/ec2/models.py @@ -13,13 +13,12 @@ Extra, Field, NonNegativeFloat, - PositiveInt, validator, ) from types_aiobotocore_ec2.literals import InstanceStateNameType, InstanceTypeType -class Resources(BaseModel): +class Resources(BaseModel, frozen=True): cpus: NonNegativeFloat ram: ByteSize @@ -53,12 +52,16 @@ def __sub__(self, other: "Resources") -> "Resources": } ) + @validator("cpus", pre=True) + @classmethod + def _floor_cpus_to_0(cls, v: float) -> float: + return max(v, 0) -@dataclass(frozen=True) + +@dataclass(frozen=True, kw_only=True, slots=True) class EC2InstanceType: name: InstanceTypeType - cpus: PositiveInt - ram: ByteSize + resources: Resources InstancePrivateDNSName: TypeAlias = str diff --git a/packages/models-library/src/models_library/api_schemas_clusters_keeper/ec2_instances.py b/packages/models-library/src/models_library/api_schemas_clusters_keeper/ec2_instances.py index 16f47d2a3fd..057c02e1815 100644 --- a/packages/models-library/src/models_library/api_schemas_clusters_keeper/ec2_instances.py +++ b/packages/models-library/src/models_library/api_schemas_clusters_keeper/ec2_instances.py @@ -1,10 +1,10 @@ from dataclasses import dataclass -from pydantic import ByteSize, PositiveInt +from pydantic import ByteSize, NonNegativeFloat @dataclass(frozen=True) class EC2InstanceTypeGet: name: str - cpus: PositiveInt + cpus: NonNegativeFloat ram: ByteSize diff --git a/packages/pytest-simcore/src/pytest_simcore/dask_scheduler.py b/packages/pytest-simcore/src/pytest_simcore/dask_scheduler.py index c3af235c220..54019faca11 100644 --- a/packages/pytest-simcore/src/pytest_simcore/dask_scheduler.py +++ b/packages/pytest-simcore/src/pytest_simcore/dask_scheduler.py @@ -75,7 +75,7 @@ async def dask_spec_local_cluster( scheduler_address = URL(cluster.scheduler_address) monkeypatch.setenv( "COMPUTATIONAL_BACKEND_DEFAULT_CLUSTER_URL", - f"{scheduler_address}" or "invalid", + f"{scheduler_address or 'invalid'}", ) yield cluster @@ -95,7 +95,7 @@ async def dask_local_cluster_without_workers( scheduler_address = URL(cluster.scheduler_address) monkeypatch.setenv( "COMPUTATIONAL_BACKEND_DEFAULT_CLUSTER_URL", - f"{scheduler_address}" or "invalid", + f"{scheduler_address or 'invalid'}", ) yield cluster diff --git a/services/autoscaling/docker/entrypoint.sh b/services/autoscaling/docker/entrypoint.sh index 85ad74c8f13..7ae4d22910a 100755 --- a/services/autoscaling/docker/entrypoint.sh +++ b/services/autoscaling/docker/entrypoint.sh @@ -71,8 +71,8 @@ if [ "${SC_BUILD_TARGET}" = "development" ]; then fi if [ "${SC_BOOT_MODE}" = "debug-ptvsd" ]; then - # NOTE: production does NOT pre-installs ptvsd - pip install --no-cache-dir ptvsd + # NOTE: production does NOT pre-installs debugpy + pip install --no-cache-dir debugpy fi # Appends docker group if socket is mounted diff --git a/services/autoscaling/src/simcore_service_autoscaling/core/application.py b/services/autoscaling/src/simcore_service_autoscaling/core/application.py index 7f07c67c110..6bd496f0798 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/core/application.py +++ b/services/autoscaling/src/simcore_service_autoscaling/core/application.py @@ -1,6 +1,7 @@ import logging from fastapi import FastAPI +from models_library.basic_types import BootModeEnum from servicelib.fastapi.prometheus_instrumentation import ( setup_prometheus_instrumentation, ) @@ -21,6 +22,7 @@ from ..modules.ec2 import setup as setup_ec2 from ..modules.rabbitmq import setup as setup_rabbitmq from ..modules.redis import setup as setup_redis +from ..modules.remote_debug import setup_remote_debugging from .settings import ApplicationSettings logger = logging.getLogger(__name__) @@ -46,6 +48,8 @@ def create_app(settings: ApplicationSettings) -> FastAPI: setup_prometheus_instrumentation(app) # PLUGINS SETUP + if settings.SC_BOOT_MODE == BootModeEnum.DEBUG: + setup_remote_debugging(app) setup_api_routes(app) setup_docker(app) setup_rabbitmq(app) diff --git a/services/autoscaling/src/simcore_service_autoscaling/core/settings.py b/services/autoscaling/src/simcore_service_autoscaling/core/settings.py index 989e9cff3a6..352c3731e42 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/core/settings.py +++ b/services/autoscaling/src/simcore_service_autoscaling/core/settings.py @@ -8,6 +8,7 @@ BootModeEnum, BuildTargetEnum, LogLevel, + PortInt, VersionTag, ) from models_library.docker import DockerLabelKey @@ -182,6 +183,7 @@ class ApplicationSettings(BaseCustomSettings, MixinLoggingSettings): AUTOSCALING_DEBUG: bool = Field( default=False, description="Debug mode", env=["AUTOSCALING_DEBUG", "DEBUG"] ) + AUTOSCALING_REMOTE_DEBUG_PORT: PortInt = PortInt(3000) AUTOSCALING_LOGLEVEL: LogLevel = Field( LogLevel.INFO, env=["AUTOSCALING_LOGLEVEL", "LOG_LEVEL", "LOGLEVEL"] diff --git a/services/autoscaling/src/simcore_service_autoscaling/models.py b/services/autoscaling/src/simcore_service_autoscaling/models.py index de2df37d83c..f87398de0f9 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/models.py +++ b/services/autoscaling/src/simcore_service_autoscaling/models.py @@ -6,43 +6,68 @@ from models_library.generated_models.docker_rest_api import Node -@dataclass(frozen=True, kw_only=True) -class AssignedTasksToInstance: - instance: EC2InstanceData - available_resources: Resources - assigned_tasks: list +@dataclass(frozen=True, slots=True, kw_only=True) +class _TaskAssignmentMixin: + assigned_tasks: list = field(default_factory=list) + available_resources: Resources = field(default_factory=Resources.create_as_empty) + def assign_task(self, task, task_resources: Resources) -> None: + self.assigned_tasks.append(task) + object.__setattr__( + self, "available_resources", self.available_resources - task_resources + ) -@dataclass(frozen=True, kw_only=True) -class AssignedTasksToInstanceType: + def has_resources_for_task(self, task_resources: Resources) -> bool: + return bool(self.available_resources >= task_resources) + + +@dataclass(frozen=True, kw_only=True, slots=True) +class AssignedTasksToInstanceType(_TaskAssignmentMixin): instance_type: EC2InstanceType - assigned_tasks: list -@dataclass(frozen=True) -class AssociatedInstance: - node: Node +@dataclass(frozen=True, kw_only=True, slots=True) +class _BaseInstance(_TaskAssignmentMixin): ec2_instance: EC2InstanceData + def __post_init__(self) -> None: + if self.available_resources == Resources.create_as_empty(): + object.__setattr__(self, "available_resources", self.ec2_instance.resources) + + +@dataclass(frozen=True, kw_only=True, slots=True) +class AssociatedInstance(_BaseInstance): + node: Node + + +@dataclass(frozen=True, kw_only=True, slots=True) +class NonAssociatedInstance(_BaseInstance): + ... -@dataclass(frozen=True) + +@dataclass(frozen=True, kw_only=True, slots=True) class Cluster: active_nodes: list[AssociatedInstance] = field( metadata={ - "description": "This is a EC2 backed docker node which is active (with running tasks)" + "description": "This is a EC2-backed docker node which is active and ready to receive tasks (or with running tasks)" + } + ) + pending_nodes: list[AssociatedInstance] = field( + metadata={ + "description": "This is a EC2-backed docker node which is active and NOT yet ready to receive tasks" } ) drained_nodes: list[AssociatedInstance] = field( metadata={ - "description": "This is a EC2 backed docker node which is drained (with no tasks)" + "description": "This is a EC2-backed docker node which is drained (cannot accept tasks)" } ) reserve_drained_nodes: list[AssociatedInstance] = field( metadata={ - "description": "This is a EC2 backed docker node which is drained in the reserve if this is enabled (with no tasks)" + "description": "This is a EC2-backed docker node which is drained in the reserve if this is enabled (with no tasks)" } ) - pending_ec2s: list[EC2InstanceData] = field( + pending_ec2s: list[NonAssociatedInstance] = field( metadata={ "description": "This is an EC2 instance that is not yet associated to a docker node" } @@ -54,6 +79,23 @@ class Cluster: ) terminated_instances: list[EC2InstanceData] + def can_scale_down(self) -> bool: + return bool( + self.active_nodes + or self.pending_nodes + or self.drained_nodes + or self.pending_ec2s + ) + + def total_number_of_machines(self) -> int: + return ( + len(self.active_nodes) + + len(self.pending_nodes) + + len(self.drained_nodes) + + len(self.reserve_drained_nodes) + + len(self.pending_ec2s) + ) + DaskTaskId: TypeAlias = str diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py index d9ded9d9150..63a31cfa195 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py @@ -3,8 +3,9 @@ import dataclasses import datetime import itertools +import json import logging -from typing import cast +from typing import Final, cast import arrow from aws_library.ec2.models import ( @@ -15,16 +16,13 @@ Resources, ) from fastapi import FastAPI -from models_library.generated_models.docker_rest_api import ( - Availability, - Node, - NodeState, -) -from servicelib.logging_utils import log_catch +from fastapi.encoders import jsonable_encoder +from models_library.generated_models.docker_rest_api import Node, NodeState +from servicelib.logging_utils import log_catch, log_context +from servicelib.utils_formatting import timedelta_as_minute_second from types_aiobotocore_ec2.literals import InstanceTypeType from ..core.errors import ( - DaskWorkerNotFoundError, Ec2InstanceInvalidError, Ec2InstanceNotFoundError, Ec2InvalidDnsNameError, @@ -32,16 +30,15 @@ ) from ..core.settings import ApplicationSettings, get_application_settings from ..models import ( - AssignedTasksToInstance, AssignedTasksToInstanceType, AssociatedInstance, Cluster, + NonAssociatedInstance, ) from ..utils import utils_docker, utils_ec2 from ..utils.auto_scaling_core import ( associate_ec2_instances_with_nodes, ec2_startup_script, - filter_by_task_defined_instance, find_selected_instance_type_for_task, node_host_name_from_ec2_private_dns, ) @@ -78,49 +75,64 @@ async def _analyze_current_cluster( docker_nodes, existing_ec2_instances ) - def _is_node_up_and_available(node: Node, availability: Availability) -> bool: - assert node.Status # nosec - assert node.Spec # nosec - return bool( - node.Status.State == NodeState.ready - and node.Spec.Availability == availability - ) - def _node_not_ready(node: Node) -> bool: assert node.Status # nosec return bool(node.Status.State != NodeState.ready) - all_drained_nodes = [ - i - for i in attached_ec2s - if _is_node_up_and_available(i.node, Availability.drain) - ] + active_nodes, pending_nodes, all_drained_nodes = [], [], [] + for instance in attached_ec2s: + if await auto_scaling_mode.is_instance_active(app, instance): + node_used_resources = await auto_scaling_mode.compute_node_used_resources( + app, instance + ) + active_nodes.append( + dataclasses.replace( + instance, + available_resources=instance.ec2_instance.resources + - node_used_resources, + ) + ) + elif auto_scaling_mode.is_instance_drained(instance): + all_drained_nodes.append(instance) + else: + pending_nodes.append(instance) cluster = Cluster( - active_nodes=[ - i - for i in attached_ec2s - if _is_node_up_and_available(i.node, Availability.active) - ], + active_nodes=active_nodes, + pending_nodes=pending_nodes, drained_nodes=all_drained_nodes[ app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER : ], reserve_drained_nodes=all_drained_nodes[ : app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER ], - pending_ec2s=pending_ec2s, + pending_ec2s=[NonAssociatedInstance(ec2_instance=i) for i in pending_ec2s], terminated_instances=terminated_ec2_instances, disconnected_nodes=[n for n in docker_nodes if _node_not_ready(n)], ) - _logger.info("current state: %s", f"{cluster=}") + _logger.debug( + "current state: %s", + f"{json.dumps(jsonable_encoder(cluster, include={'active_nodes', 'pending_nodes', 'drained_nodes', 'reserve_drained_nodes', 'pending_ec2s'}), indent=2)}", + ) return cluster +_DELAY_FOR_REMOVING_DISCONNECTED_NODES_S: Final[int] = 30 + + async def _cleanup_disconnected_nodes(app: FastAPI, cluster: Cluster) -> Cluster: - if cluster.disconnected_nodes: - await utils_docker.remove_nodes( - get_docker_client(app), nodes=cluster.disconnected_nodes + utc_now = arrow.utcnow().datetime + removeable_nodes = [ + node + for node in cluster.disconnected_nodes + if node.UpdatedAt + and ( + (utc_now - arrow.get(node.UpdatedAt).datetime).total_seconds() + > _DELAY_FOR_REMOVING_DISCONNECTED_NODES_S ) + ] + if removeable_nodes: + await utils_docker.remove_nodes(get_docker_client(app), nodes=removeable_nodes) return dataclasses.replace(cluster, disconnected_nodes=[]) @@ -129,12 +141,14 @@ async def _try_attach_pending_ec2s( ) -> Cluster: """label the drained instances that connected to the swarm which are missing the monitoring labels""" new_found_instances: list[AssociatedInstance] = [] - still_pending_ec2s: list[EC2InstanceData] = [] + still_pending_ec2s: list[NonAssociatedInstance] = [] app_settings = get_application_settings(app) assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec for instance_data in cluster.pending_ec2s: try: - node_host_name = node_host_name_from_ec2_private_dns(instance_data) + node_host_name = node_host_name_from_ec2_private_dns( + instance_data.ec2_instance + ) if new_node := await utils_docker.find_node_with_name( get_docker_client(app), node_host_name ): @@ -142,10 +156,19 @@ async def _try_attach_pending_ec2s( new_node = await utils_docker.tag_node( get_docker_client(app), new_node, - tags=auto_scaling_mode.get_new_node_docker_tags(app, instance_data), + tags=auto_scaling_mode.get_new_node_docker_tags( + app, instance_data.ec2_instance + ), available=False, ) - new_found_instances.append(AssociatedInstance(new_node, instance_data)) + new_found_instances.append( + AssociatedInstance( + node=new_node, ec2_instance=instance_data.ec2_instance + ) + ) + _logger.info( + "Attached new EC2 instance %s", instance_data.ec2_instance.id + ) else: still_pending_ec2s.append(instance_data) except Ec2InvalidDnsNameError: # noqa: PERF203 @@ -197,65 +220,44 @@ async def _activate_and_notify( app: FastAPI, auto_scaling_mode: BaseAutoscaling, drained_node: AssociatedInstance, - tasks: list, -) -> list: +) -> None: await asyncio.gather( utils_docker.set_node_availability( get_docker_client(app), drained_node.node, available=True ), auto_scaling_mode.log_message_from_tasks( app, - tasks, + drained_node.assigned_tasks, "cluster adjusted, service should start shortly...", level=logging.INFO, ), - auto_scaling_mode.progress_message_from_tasks(app, tasks, progress=1.0), + auto_scaling_mode.progress_message_from_tasks( + app, drained_node.assigned_tasks, progress=1.0 + ), ) - return tasks async def _activate_drained_nodes( app: FastAPI, cluster: Cluster, - pending_tasks: list, auto_scaling_mode: BaseAutoscaling, -) -> tuple[list, Cluster]: - """returns the tasks that were assigned to the drained nodes""" - if not pending_tasks: - # nothing to do - return [], cluster - - activatable_nodes: list[tuple[AssociatedInstance, list]] = [ - ( - node, - [], - ) +) -> Cluster: + nodes_to_activate = [ + node for node in itertools.chain( cluster.drained_nodes, cluster.reserve_drained_nodes ) - ] - - still_pending_tasks = [ - task - for task in pending_tasks - if not auto_scaling_mode.try_assigning_task_to_node(task, activatable_nodes) - ] - - nodes_to_activate = [ - (node, assigned_tasks) - for node, assigned_tasks in activatable_nodes - if assigned_tasks + if node.assigned_tasks ] # activate these nodes now await asyncio.gather( *( - _activate_and_notify(app, auto_scaling_mode, node, tasks) - for node, tasks in nodes_to_activate + _activate_and_notify(app, auto_scaling_mode, node) + for node in nodes_to_activate ) ) - new_active_nodes = [node for node, _ in nodes_to_activate] - new_active_node_ids = {node.ec2_instance.id for node in new_active_nodes} + new_active_node_ids = {node.ec2_instance.id for node in nodes_to_activate} remaining_drained_nodes = [ node for node in cluster.drained_nodes @@ -266,151 +268,195 @@ async def _activate_drained_nodes( for node in cluster.reserve_drained_nodes if node.ec2_instance.id not in new_active_node_ids ] - return still_pending_tasks, dataclasses.replace( + return dataclasses.replace( cluster, - active_nodes=cluster.active_nodes + new_active_nodes, + active_nodes=cluster.active_nodes + nodes_to_activate, drained_nodes=remaining_drained_nodes, reserve_drained_nodes=remaining_reserved_drained_nodes, ) -async def _try_assign_tasks_to_instances( - app: FastAPI, +def _try_assign_task_to_ec2_instance( task, - auto_scaling_mode: BaseAutoscaling, - task_defined_ec2_type: InstanceTypeType | None, - active_instances_to_tasks: list[AssignedTasksToInstance], - pending_instances_to_tasks: list[AssignedTasksToInstance], - drained_instances_to_tasks: list[AssignedTasksToInstance], - needed_new_instance_types_for_tasks: list[AssignedTasksToInstanceType], + *, + instances: list[AssociatedInstance] | list[NonAssociatedInstance], + task_required_ec2_instance: InstanceTypeType | None, + task_required_resources: Resources, ) -> bool: - ( - filtered_active_instance_to_task, - filtered_pending_instance_to_task, - filtered_drained_instances_to_task, - filtered_needed_new_instance_types_to_task, - ) = filter_by_task_defined_instance( - task_defined_ec2_type, - active_instances_to_tasks, - pending_instances_to_tasks, - drained_instances_to_tasks, - needed_new_instance_types_for_tasks, - ) - # try to assign the task to one of the active, pending or net created instances - if ( - await auto_scaling_mode.try_assigning_task_to_instances( - app, - task, - filtered_active_instance_to_task, - notify_progress=False, - ) - or await auto_scaling_mode.try_assigning_task_to_instances( - app, - task, - filtered_pending_instance_to_task, - notify_progress=True, - ) - or await auto_scaling_mode.try_assigning_task_to_instances( - app, - task, - filtered_drained_instances_to_task, - notify_progress=False, + for instance in instances: + if task_required_ec2_instance and ( + task_required_ec2_instance != instance.ec2_instance.type + ): + continue + if instance.has_resources_for_task(task_required_resources): + instance.assign_task(task, task_required_resources) + _logger.debug( + "%s", + f"assigned task with {task_required_resources=}, {task_required_ec2_instance=} to " + f"{instance.ec2_instance.id=}:{instance.ec2_instance.type}, " + f"remaining resources:{instance.available_resources}/{instance.ec2_instance.resources}", + ) + return True + return False + + +def _try_assign_task_to_ec2_instance_type( + task, + *, + instances: list[AssignedTasksToInstanceType], + task_required_ec2_instance: InstanceTypeType | None, + task_required_resources: Resources, +) -> bool: + for instance in instances: + if task_required_ec2_instance and ( + task_required_ec2_instance != instance.instance_type.name + ): + continue + if instance.has_resources_for_task(task_required_resources): + instance.assign_task(task, task_required_resources) + _logger.debug( + "%s", + f"assigned task with {task_required_resources=}, {task_required_ec2_instance=} to " + f"{instance.instance_type}, " + f"remaining resources:{instance.available_resources}/{instance.instance_type.resources}", + ) + return True + return False + + +async def _assign_tasks_to_current_cluster( + app: FastAPI, + tasks: list, + cluster: Cluster, + auto_scaling_mode: BaseAutoscaling, +) -> tuple[list, Cluster]: + unassigned_tasks = [] + for task in tasks: + task_required_resources = auto_scaling_mode.get_task_required_resources(task) + task_required_ec2_instance = await auto_scaling_mode.get_task_defined_instance( + app, task ) - or auto_scaling_mode.try_assigning_task_to_instance_types( - task, filtered_needed_new_instance_types_to_task + + assignment_functions = [ + lambda task, required_ec2, required_resources: _try_assign_task_to_ec2_instance( + task, + instances=cluster.active_nodes, + task_required_ec2_instance=required_ec2, + task_required_resources=required_resources, + ), + lambda task, required_ec2, required_resources: _try_assign_task_to_ec2_instance( + task, + instances=cluster.drained_nodes + cluster.reserve_drained_nodes, + task_required_ec2_instance=required_ec2, + task_required_resources=required_resources, + ), + lambda task, required_ec2, required_resources: _try_assign_task_to_ec2_instance( + task, + instances=cluster.pending_nodes, + task_required_ec2_instance=required_ec2, + task_required_resources=required_resources, + ), + lambda task, required_ec2, required_resources: _try_assign_task_to_ec2_instance( + task, + instances=cluster.pending_ec2s, + task_required_ec2_instance=required_ec2, + task_required_resources=required_resources, + ), + ] + + if any( + assignment(task, task_required_ec2_instance, task_required_resources) + for assignment in assignment_functions + ): + _logger.debug("assigned task to cluster") + else: + unassigned_tasks.append(task) + + if unassigned_tasks: + _logger.info( + "the current cluster should cope with %s tasks, %s are unnassigned/queued tasks", + len(tasks) - len(unassigned_tasks), + len(unassigned_tasks), ) - ): - return True - return False + return unassigned_tasks, cluster async def _find_needed_instances( app: FastAPI, - pending_tasks: list, + unassigned_tasks: list, available_ec2_types: list[EC2InstanceType], cluster: Cluster, auto_scaling_mode: BaseAutoscaling, ) -> dict[EC2InstanceType, int]: # 1. check first the pending task needs - active_instances_to_tasks: list[AssignedTasksToInstance] = [ - AssignedTasksToInstance( - instance=i.ec2_instance, - assigned_tasks=[], - available_resources=i.ec2_instance.resources - - await auto_scaling_mode.compute_node_used_resources(app, i), - ) - for i in cluster.active_nodes - ] - pending_instances_to_tasks: list[AssignedTasksToInstance] = [ - AssignedTasksToInstance( - instance=i, assigned_tasks=[], available_resources=i.resources - ) - for i in cluster.pending_ec2s - ] - drained_instances_to_tasks: list[AssignedTasksToInstance] = [ - AssignedTasksToInstance( - instance=i.ec2_instance, - assigned_tasks=[], - available_resources=i.ec2_instance.resources, - ) - for i in cluster.drained_nodes - ] needed_new_instance_types_for_tasks: list[AssignedTasksToInstanceType] = [] - for task in pending_tasks: - task_defined_ec2_type = await auto_scaling_mode.get_task_defined_instance( - app, task - ) - _logger.info( - "task %s %s", - task, - f"defines ec2 type as {task_defined_ec2_type}" - if task_defined_ec2_type - else "does NOT define ec2 type", - ) - if await _try_assign_tasks_to_instances( - app, - task, - auto_scaling_mode, - task_defined_ec2_type, - active_instances_to_tasks, - pending_instances_to_tasks, - drained_instances_to_tasks, - needed_new_instance_types_for_tasks, - ): - continue + with log_context(_logger, logging.DEBUG, msg="finding needed instances"): + for task in unassigned_tasks: + task_required_resources = auto_scaling_mode.get_task_required_resources( + task + ) + task_required_ec2_instance = ( + await auto_scaling_mode.get_task_defined_instance(app, task) + ) - # so we need to find what we can create now - try: - # check if exact instance type is needed first - if task_defined_ec2_type: - defined_ec2 = find_selected_instance_type_for_task( - task_defined_ec2_type, available_ec2_types, auto_scaling_mode, task - ) - needed_new_instance_types_for_tasks.append( - AssignedTasksToInstanceType( - instance_type=defined_ec2, assigned_tasks=[task] + # first check if we can assign the task to one of the newly tobe created instances + if _try_assign_task_to_ec2_instance_type( + task, + instances=needed_new_instance_types_for_tasks, + task_required_ec2_instance=task_required_ec2_instance, + task_required_resources=task_required_resources, + ): + continue + + # so we need to find what we can create now + try: + # check if exact instance type is needed first + if task_required_ec2_instance: + defined_ec2 = find_selected_instance_type_for_task( + task_required_ec2_instance, + available_ec2_types, + auto_scaling_mode, + task, ) - ) - else: - # we go for best fitting type - best_ec2_instance = utils_ec2.find_best_fitting_ec2_instance( - available_ec2_types, - auto_scaling_mode.get_max_resources_from_task(task), - score_type=utils_ec2.closest_instance_policy, - ) - needed_new_instance_types_for_tasks.append( - AssignedTasksToInstanceType( - instance_type=best_ec2_instance, assigned_tasks=[task] + needed_new_instance_types_for_tasks.append( + AssignedTasksToInstanceType( + instance_type=defined_ec2, + assigned_tasks=[task], + available_resources=defined_ec2.resources + - task_required_resources, + ) + ) + else: + # we go for best fitting type + best_ec2_instance = utils_ec2.find_best_fitting_ec2_instance( + available_ec2_types, + auto_scaling_mode.get_task_required_resources(task), + score_type=utils_ec2.closest_instance_policy, ) + needed_new_instance_types_for_tasks.append( + AssignedTasksToInstanceType( + instance_type=best_ec2_instance, + assigned_tasks=[task], + available_resources=best_ec2_instance.resources + - task_required_resources, + ) + ) + except Ec2InstanceNotFoundError: + _logger.exception( + "Task %s needs more resources than any EC2 instance " + "can provide with the current configuration. Please check!", + f"{task}", ) - except Ec2InstanceNotFoundError: - _logger.exception( - "Task %s needs more resources than any EC2 instance " - "can provide with the current configuration. Please check!", - f"{task}", - ) - except Ec2InstanceInvalidError: - _logger.exception("Unexpected error:") + except Ec2InstanceInvalidError: + _logger.exception("Unexpected error:") + + _logger.info( + "found following needed instances: %s", + [ + f"{i.instance_type.name=}:{i.instance_type.resources} with {len(i.assigned_tasks)} tasks" + for i in needed_new_instance_types_for_tasks + ], + ) num_instances_per_type = collections.defaultdict( int, @@ -430,8 +476,8 @@ async def _find_needed_instances( ) > 0: # check if some are already pending remaining_pending_instances = [ - i.instance for i in pending_instances_to_tasks if not i.assigned_tasks - ] + i.ec2_instance for i in cluster.pending_ec2s if not i.assigned_tasks + ] + [i.ec2_instance for i in cluster.pending_nodes if not i.assigned_tasks] if len(remaining_pending_instances) < ( app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER - len(cluster.reserve_drained_nodes) @@ -574,7 +620,7 @@ async def _start_instances( "Exceptionally high load on computational cluster, please try again later.", level=logging.ERROR, ) - elif isinstance(r, Exception): + elif isinstance(r, BaseException): _logger.error("Unexpected error happened when starting EC2 instance: %s", r) last_issue = f"{r}" elif isinstance(r, list): @@ -603,7 +649,7 @@ async def _start_instances( async def _scale_up_cluster( app: FastAPI, cluster: Cluster, - pending_tasks: list, + unassigned_tasks: list, auto_scaling_mode: BaseAutoscaling, ) -> Cluster: app_settings: ApplicationSettings = app.state.settings @@ -614,48 +660,42 @@ async def _scale_up_cluster( # let's start these if needed_ec2_instances := await _find_needed_instances( - app, pending_tasks, allowed_instance_types, cluster, auto_scaling_mode + app, unassigned_tasks, allowed_instance_types, cluster, auto_scaling_mode ): await auto_scaling_mode.log_message_from_tasks( app, - pending_tasks, - "service is pending due to missing resources, scaling up cluster now\n" - f"{sum(n for n in needed_ec2_instances.values())} new machines will be added, please wait...", + unassigned_tasks, + "service is pending due to missing resources, scaling up cluster now...", level=logging.INFO, ) - # NOTE: notify the up-scaling progress started... - await auto_scaling_mode.progress_message_from_tasks(app, pending_tasks, 0.001) new_pending_instances = await _start_instances( - app, needed_ec2_instances, pending_tasks, auto_scaling_mode + app, needed_ec2_instances, unassigned_tasks, auto_scaling_mode + ) + cluster.pending_ec2s.extend( + [NonAssociatedInstance(ec2_instance=i) for i in new_pending_instances] ) - cluster.pending_ec2s.extend(new_pending_instances) # NOTE: to check the logs of UserData in EC2 instance # run: tail -f -n 1000 /var/log/cloud-init-output.log in the instance return cluster -async def _deactivate_empty_nodes( - app: FastAPI, cluster: Cluster, auto_scaling_mode: BaseAutoscaling -) -> Cluster: +async def _deactivate_empty_nodes(app: FastAPI, cluster: Cluster) -> Cluster: docker_client = get_docker_client(app) active_empty_instances: list[AssociatedInstance] = [] active_non_empty_instances: list[AssociatedInstance] = [] for instance in cluster.active_nodes: - try: - node_used_resources = await auto_scaling_mode.compute_node_used_resources( - app, - instance, - ) - if node_used_resources == Resources.create_as_empty(): - active_empty_instances.append(instance) - else: - active_non_empty_instances.append(instance) - except DaskWorkerNotFoundError: # noqa: PERF203 - _logger.exception( - "EC2 node instance is not registered to dask-scheduler! TIP: Needs investigation" - ) + if instance.available_resources == instance.ec2_instance.resources: + active_empty_instances.append(instance) + else: + active_non_empty_instances.append(instance) + if not active_empty_instances: + return cluster + _logger.info( + "following nodes will be drained: '%s'", + f"{[instance.node.Description.Hostname for instance in active_empty_instances if instance.node.Description]}", + ) # drain this empty nodes updated_nodes: list[Node] = await asyncio.gather( *( @@ -673,7 +713,7 @@ async def _deactivate_empty_nodes( f"{[node.Description.Hostname for node in updated_nodes if node.Description]}", ) newly_drained_instances = [ - AssociatedInstance(node, instance.ec2_instance) + AssociatedInstance(node=node, ec2_instance=instance.ec2_instance) for instance, node in zip(active_empty_instances, updated_nodes, strict=True) ] return dataclasses.replace( @@ -712,7 +752,7 @@ async def _find_terminateable_instances( _logger.info( "%s has still %ss before being terminateable", f"{instance.ec2_instance.id=}", - f"{(app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_TERMINATION - elapsed_time_since_drained).total_seconds()}", + f"{(app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_TERMINATION - elapsed_time_since_drained).total_seconds():.0f}", ) if terminateable_nodes: @@ -755,8 +795,55 @@ async def _try_scale_down_cluster(app: FastAPI, cluster: Cluster) -> Cluster: terminated_instances=cluster.terminated_instances + [i.ec2_instance for i in terminateable_instances], ) - # 3. we could ask on rabbit whether someone would like to keep that machine for something (like the agent for example), if that is the case, we wait another hour and ask again? - # 4. + + +async def _notify_based_on_machine_type( + app: FastAPI, + instances: list[AssociatedInstance] | list[NonAssociatedInstance], + auto_scaling_mode: BaseAutoscaling, + *, + message: str, +) -> None: + app_settings = get_application_settings(app) + assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec + instance_max_time_to_start = ( + app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_START_TIME + ) + launch_time_to_tasks: dict[datetime.datetime, list] = collections.defaultdict(list) + now = datetime.datetime.now(datetime.timezone.utc) + for instance in instances: + launch_time_to_tasks[ + instance.ec2_instance.launch_time + ] += instance.assigned_tasks + + for launch_time, tasks in launch_time_to_tasks.items(): + time_since_launch = now - launch_time + estimated_time_to_completion = launch_time + instance_max_time_to_start - now + msg = ( + f"{message} (time waiting: {timedelta_as_minute_second(time_since_launch)}," + f" est. remaining time: {timedelta_as_minute_second(estimated_time_to_completion)})...please wait..." + ) + if tasks: + await auto_scaling_mode.log_message_from_tasks( + app, tasks, message=msg, level=logging.INFO + ) + await auto_scaling_mode.progress_message_from_tasks( + app, + tasks, + progress=time_since_launch.total_seconds() + / instance_max_time_to_start.total_seconds(), + ) + + +async def _notify_machine_creation_progress( + app: FastAPI, cluster: Cluster, auto_scaling_mode: BaseAutoscaling +) -> None: + await _notify_based_on_machine_type( + app, + cluster.pending_ec2s, + auto_scaling_mode, + message="waiting for machine to join cluster", + ) async def _autoscale_cluster( @@ -765,28 +852,43 @@ async def _autoscale_cluster( # 1. check if we have pending tasks and resolve them by activating some drained nodes unrunnable_tasks = await auto_scaling_mode.list_unrunnable_tasks(app) _logger.info("found %s unrunnable tasks", len(unrunnable_tasks)) - # 2. try to activate drained nodes to cover some of the tasks - still_unrunnable_tasks, cluster = await _activate_drained_nodes( - app, cluster, unrunnable_tasks, auto_scaling_mode - ) - _logger.info( - "still %s unrunnable tasks after node activation", len(still_unrunnable_tasks) + + queued_or_missing_instance_tasks, cluster = await _assign_tasks_to_current_cluster( + app, unrunnable_tasks, cluster, auto_scaling_mode ) + # 2. try to activate drained nodes to cover some of the tasks + cluster = await _activate_drained_nodes(app, cluster, auto_scaling_mode) # let's check if there are still pending tasks or if the reserve was used app_settings = get_application_settings(app) assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec - if still_unrunnable_tasks or ( + if queued_or_missing_instance_tasks or ( len(cluster.reserve_drained_nodes) < app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER ): - # yes? then scale up - cluster = await _scale_up_cluster( - app, cluster, still_unrunnable_tasks, auto_scaling_mode + if ( + cluster.total_number_of_machines() + < app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES + ): + _logger.info( + "%s unrunnable tasks could not be assigned, slowly trying to scale up...", + len(queued_or_missing_instance_tasks), + ) + cluster = await _scale_up_cluster( + app, cluster, queued_or_missing_instance_tasks, auto_scaling_mode + ) + + elif ( + len(queued_or_missing_instance_tasks) == len(unrunnable_tasks) == 0 + and cluster.can_scale_down() + ): + _logger.info( + "there is %s waiting task, slowly and gracefully scaling down...", + len(queued_or_missing_instance_tasks), ) - elif still_unrunnable_tasks == unrunnable_tasks: # NOTE: we only scale down in case we did not just scale up. The swarm needs some time to adjust - cluster = await _deactivate_empty_nodes(app, cluster, auto_scaling_mode) + await auto_scaling_mode.try_retire_nodes(app) + cluster = await _deactivate_empty_nodes(app, cluster) cluster = await _try_scale_down_cluster(app, cluster) return cluster @@ -829,6 +931,7 @@ async def auto_scale_cluster( cluster = await _analyze_current_cluster(app, auto_scaling_mode) cluster = await _cleanup_disconnected_nodes(app, cluster) cluster = await _try_attach_pending_ec2s(app, cluster, auto_scaling_mode) - cluster = await _autoscale_cluster(app, cluster, auto_scaling_mode) + cluster = await _autoscale_cluster(app, cluster, auto_scaling_mode) + await _notify_machine_creation_progress(app, cluster, auto_scaling_mode) await _notify_autoscaling_status(app, cluster, auto_scaling_mode) diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_base.py b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_base.py index d375a511a5e..070dad548f9 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_base.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_base.py @@ -4,15 +4,13 @@ from aws_library.ec2.models import EC2InstanceData, EC2Tags, Resources from fastapi import FastAPI from models_library.docker import DockerLabelKey +from models_library.generated_models.docker_rest_api import Availability from models_library.generated_models.docker_rest_api import Node as DockerNode from servicelib.logging_utils import LogLevelInt from types_aiobotocore_ec2.literals import InstanceTypeType -from ..models import ( - AssignedTasksToInstance, - AssignedTasksToInstanceType, - AssociatedInstance, -) +from ..models import AssociatedInstance +from ..utils import utils_docker @dataclass @@ -39,32 +37,6 @@ def get_new_node_docker_tags( async def list_unrunnable_tasks(app: FastAPI) -> list: ... - @staticmethod - @abstractmethod - def try_assigning_task_to_node( - task, instances_to_tasks: list[tuple[AssociatedInstance, list]] - ) -> bool: - ... - - @staticmethod - @abstractmethod - async def try_assigning_task_to_instances( - app: FastAPI, - pending_task, - instances_to_tasks: list[AssignedTasksToInstance], - *, - notify_progress: bool - ) -> bool: - ... - - @staticmethod - @abstractmethod - def try_assigning_task_to_instance_types( - pending_task, - instance_types_to_tasks: list[AssignedTasksToInstanceType], - ) -> bool: - ... - @staticmethod @abstractmethod async def log_message_from_tasks( @@ -81,7 +53,7 @@ async def progress_message_from_tasks( @staticmethod @abstractmethod - def get_max_resources_from_task(task) -> Resources: + def get_task_required_resources(task) -> Resources: ... @staticmethod @@ -109,3 +81,19 @@ async def compute_cluster_total_resources( app: FastAPI, instances: list[AssociatedInstance] ) -> Resources: ... + + @staticmethod + @abstractmethod + async def is_instance_active(app: FastAPI, instance: AssociatedInstance) -> bool: + ... + + @staticmethod + def is_instance_drained(instance: AssociatedInstance) -> bool: + return utils_docker.is_node_ready_and_available( + instance.node, Availability.drain + ) + + @staticmethod + @abstractmethod + async def try_retire_nodes(app: FastAPI) -> None: + ... diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_computational.py b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_computational.py index 8ad53e676e9..2feeb12bdbe 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_computational.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_computational.py @@ -1,6 +1,6 @@ import collections import logging -from collections.abc import Iterable +from typing import cast from aws_library.ec2.models import EC2InstanceData, EC2Tags, Resources from fastapi import FastAPI @@ -8,19 +8,19 @@ DOCKER_TASK_EC2_INSTANCE_TYPE_PLACEMENT_CONSTRAINT_KEY, DockerLabelKey, ) -from models_library.generated_models.docker_rest_api import Node +from models_library.generated_models.docker_rest_api import Availability, Node from pydantic import AnyUrl, ByteSize from servicelib.logging_utils import LogLevelInt from servicelib.utils import logged_gather from types_aiobotocore_ec2.literals import InstanceTypeType -from ..core.settings import get_application_settings -from ..models import ( - AssignedTasksToInstance, - AssignedTasksToInstanceType, - AssociatedInstance, - DaskTask, +from ..core.errors import ( + DaskNoWorkersError, + DaskSchedulerNotFoundError, + DaskWorkerNotFoundError, ) +from ..core.settings import get_application_settings +from ..models import AssociatedInstance, DaskTask from ..utils import computational_scaling as utils from ..utils import utils_docker, utils_ec2 from . import dask @@ -57,75 +57,66 @@ def get_new_node_docker_tags( @staticmethod async def list_unrunnable_tasks(app: FastAPI) -> list[DaskTask]: - return await dask.list_unrunnable_tasks(_scheduler_url(app)) - - @staticmethod - def try_assigning_task_to_node( - task: DaskTask, - instances_to_tasks: Iterable[tuple[AssociatedInstance, list[DaskTask]]], - ) -> bool: - return utils.try_assigning_task_to_node(task, instances_to_tasks) - - @staticmethod - async def try_assigning_task_to_instances( - app: FastAPI, - pending_task, - instances_to_tasks: list[AssignedTasksToInstance], - *, - notify_progress: bool - ) -> bool: - return await utils.try_assigning_task_to_instances( - app, - pending_task, - instances_to_tasks, - notify_progress=notify_progress, - ) - - @staticmethod - def try_assigning_task_to_instance_types( - pending_task, - instance_types_to_tasks: list[AssignedTasksToInstanceType], - ) -> bool: - return utils.try_assigning_task_to_instance_types( - pending_task, instance_types_to_tasks - ) + try: + unrunnable_tasks = await dask.list_unrunnable_tasks(_scheduler_url(app)) + # NOTE: any worker "processing" more than 1 task means that the other tasks are queued! + processing_tasks_by_worker = await dask.list_processing_tasks_per_worker( + _scheduler_url(app) + ) + queued_tasks = [] + for tasks in processing_tasks_by_worker.values(): + queued_tasks += tasks[1:] + _logger.debug( + "found %s unrunnable tasks and %s potentially queued tasks", + len(unrunnable_tasks), + len(queued_tasks), + ) + return unrunnable_tasks + queued_tasks + except DaskSchedulerNotFoundError: + _logger.warning( + "No dask scheduler found. TIP: Normal during machine startup." + ) + return [] @staticmethod async def log_message_from_tasks( app: FastAPI, tasks: list, message: str, *, level: LogLevelInt ) -> None: assert app # nosec - assert tasks # nosec + assert tasks is not None # nosec _logger.log(level, "LOG: %s", message) @staticmethod async def progress_message_from_tasks(app: FastAPI, tasks: list, progress: float): assert app # nosec - assert tasks # nosec - _logger.info("PROGRESS: %s", progress) + assert tasks is not None # nosec + _logger.info("PROGRESS: %s", f"{progress:.2f}") @staticmethod - def get_max_resources_from_task(task) -> Resources: - return utils.get_max_resources_from_dask_task(task) + def get_task_required_resources(task) -> Resources: + return utils.resources_from_dask_task(task) @staticmethod async def get_task_defined_instance(app: FastAPI, task) -> InstanceTypeType | None: assert app # nosec - return utils.get_task_instance_restriction(task) + return cast(InstanceTypeType | None, utils.get_task_instance_restriction(task)) @staticmethod async def compute_node_used_resources( app: FastAPI, instance: AssociatedInstance ) -> Resources: - num_results_in_memory = await dask.get_worker_still_has_results_in_memory( - _scheduler_url(app), instance.ec2_instance - ) - if num_results_in_memory > 0: - # NOTE: this is a trick to consider the node still useful - return Resources(cpus=1, ram=ByteSize()) - return await dask.get_worker_used_resources( - _scheduler_url(app), instance.ec2_instance - ) + try: + num_results_in_memory = await dask.get_worker_still_has_results_in_memory( + _scheduler_url(app), instance.ec2_instance + ) + if num_results_in_memory > 0: + # NOTE: this is a trick to consider the node still useful + return Resources(cpus=0, ram=ByteSize(1024 * 1024 * 1024)) + return await dask.get_worker_used_resources( + _scheduler_url(app), instance.ec2_instance + ) + except (DaskWorkerNotFoundError, DaskNoWorkersError): + return Resources.create_as_empty() @staticmethod async def compute_cluster_used_resources( @@ -146,6 +137,25 @@ async def compute_cluster_used_resources( async def compute_cluster_total_resources( app: FastAPI, instances: list[AssociatedInstance] ) -> Resources: - return await dask.compute_cluster_total_resources( - _scheduler_url(app), instances + try: + return await dask.compute_cluster_total_resources( + _scheduler_url(app), instances + ) + except DaskNoWorkersError: + return Resources.create_as_empty() + + @staticmethod + async def is_instance_active(app: FastAPI, instance: AssociatedInstance) -> bool: + if not utils_docker.is_node_ready_and_available( + instance.node, Availability.active + ): + return False + + # now check if dask-scheduler is available + return await dask.is_worker_connected( + _scheduler_url(app), instance.ec2_instance ) + + @staticmethod + async def try_retire_nodes(app: FastAPI) -> None: + await dask.try_retire_nodes(_scheduler_url(app)) diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_dynamic.py b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_dynamic.py index 3e1737b814c..76e808b1b60 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_dynamic.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_dynamic.py @@ -1,19 +1,12 @@ -from collections.abc import Iterable - from aws_library.ec2.models import EC2InstanceData, EC2Tags, Resources from fastapi import FastAPI from models_library.docker import DockerLabelKey -from models_library.generated_models.docker_rest_api import Node, Task +from models_library.generated_models.docker_rest_api import Availability, Node, Task from servicelib.logging_utils import LogLevelInt from types_aiobotocore_ec2.literals import InstanceTypeType from ..core.settings import get_application_settings -from ..models import ( - AssignedTasksToInstance, - AssignedTasksToInstanceType, - AssociatedInstance, -) -from ..utils import dynamic_scaling as utils +from ..models import AssociatedInstance from ..utils import utils_docker, utils_ec2 from ..utils.rabbitmq import log_tasks_message, progress_tasks_message from .auto_scaling_mode_base import BaseAutoscaling @@ -51,36 +44,6 @@ async def list_unrunnable_tasks(app: FastAPI) -> list[Task]: service_labels=app_settings.AUTOSCALING_NODES_MONITORING.NODES_MONITORING_SERVICE_LABELS, ) - @staticmethod - def try_assigning_task_to_node( - task, instances_to_tasks: Iterable[tuple[AssociatedInstance, list]] - ) -> bool: - return utils.try_assigning_task_to_node(task, instances_to_tasks) - - @staticmethod - async def try_assigning_task_to_instances( - app: FastAPI, - pending_task, - instances_to_tasks: list[AssignedTasksToInstance], - *, - notify_progress: bool - ) -> bool: - return await utils.try_assigning_task_to_instances( - app, - pending_task, - instances_to_tasks, - notify_progress=notify_progress, - ) - - @staticmethod - def try_assigning_task_to_instance_types( - pending_task, - instance_types_to_tasks: list[AssignedTasksToInstanceType], - ) -> bool: - return utils.try_assigning_task_to_instance_types( - pending_task, instance_types_to_tasks - ) - @staticmethod async def log_message_from_tasks( app: FastAPI, tasks: list, message: str, *, level: LogLevelInt @@ -94,7 +57,7 @@ async def progress_message_from_tasks( await progress_tasks_message(app, tasks, progress=progress) @staticmethod - def get_max_resources_from_task(task) -> Resources: + def get_task_required_resources(task) -> Resources: return utils_docker.get_max_resources_from_docker_task(task) @staticmethod @@ -133,3 +96,15 @@ async def compute_cluster_total_resources( return await utils_docker.compute_cluster_total_resources( [i.node for i in instances] ) + + @staticmethod + async def is_instance_active(app: FastAPI, instance: AssociatedInstance) -> bool: + assert app # nosec + return utils_docker.is_node_ready_and_available( + instance.node, Availability.active + ) + + @staticmethod + async def try_retire_nodes(app: FastAPI) -> None: + assert app # nosec + # nothing to do here diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/dask.py b/services/autoscaling/src/simcore_service_autoscaling/modules/dask.py index 7982672f91d..47a5488b244 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/dask.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/dask.py @@ -1,5 +1,7 @@ import contextlib import logging +import re +from collections import defaultdict from collections.abc import AsyncIterator, Coroutine from typing import Any, Final, TypeAlias @@ -49,49 +51,11 @@ async def _scheduler_client(url: AnyUrl) -> AsyncIterator[distributed.Client]: raise DaskSchedulerNotFoundError(url=url) from exc -async def list_unrunnable_tasks(url: AnyUrl) -> list[DaskTask]: - """ - Raises: - DaskSchedulerNotFoundError - """ - - def _list_tasks( - dask_scheduler: distributed.Scheduler, - ) -> dict[str, dict[str, Any]]: - return { - task.key: task.resource_restrictions for task in dask_scheduler.unrunnable - } - - async with _scheduler_client(url) as client: - list_of_tasks: dict[ - DaskTaskId, DaskTaskResources - ] = await _wrap_client_async_routine(client.run_on_scheduler(_list_tasks)) - _logger.info("found unrunnable tasks: %s", list_of_tasks) - return [ - DaskTask(task_id=task_id, required_resources=task_resources) - for task_id, task_resources in list_of_tasks.items() - ] - - -async def list_processing_tasks(url: AnyUrl) -> list[DaskTaskId]: - """ - Raises: - DaskSchedulerNotFoundError - """ - async with _scheduler_client(url) as client: - processing_tasks = set() - if worker_to_processing_tasks := await _wrap_client_async_routine( - client.processing() - ): - _logger.info("cluster worker processing: %s", worker_to_processing_tasks) - for tasks in worker_to_processing_tasks.values(): - processing_tasks |= set(tasks) - - return list(processing_tasks) - - DaskWorkerUrl: TypeAlias = str DaskWorkerDetails: TypeAlias = dict[str, Any] +DASK_NAME_PATTERN: Final[re.Pattern] = re.compile( + r"^(?P.+)_(?Pip-\d{1,3}-\d{1,3}-\d{1,3}-\d{1,3})[-_].*$" +) def _dask_worker_from_ec2_instance( @@ -104,7 +68,6 @@ def _dask_worker_from_ec2_instance( DaskWorkerNotFoundError """ node_hostname = node_host_name_from_ec2_private_dns(ec2_instance) - node_ip = node_ip_from_ec2_private_dns(ec2_instance) scheduler_info = client.scheduler_info() assert client.scheduler # nosec if "workers" not in scheduler_info or not scheduler_info["workers"]: @@ -118,19 +81,90 @@ def _find_by_worker_host( dask_worker: tuple[DaskWorkerUrl, DaskWorkerDetails] ) -> bool: _, details = dask_worker - return bool(details["host"] == node_ip) or bool( - node_hostname in details["name"] - ) + if match := re.match(DASK_NAME_PATTERN, details["name"]): + return bool(match.group("private_ip") == node_hostname) + return False filtered_workers = dict(filter(_find_by_worker_host, workers.items())) if not filtered_workers: raise DaskWorkerNotFoundError( worker_host=ec2_instance.aws_private_dns, url=client.scheduler.address ) - assert len(filtered_workers) == 1 # nosec + assert ( + len(filtered_workers) == 1 + ), f"returned workers {filtered_workers}, {node_hostname=}" # nosec return next(iter(filtered_workers.items())) +async def is_worker_connected( + scheduler_url: AnyUrl, worker_ec2_instance: EC2InstanceData +) -> bool: + with contextlib.suppress(DaskNoWorkersError, DaskWorkerNotFoundError): + async with _scheduler_client(scheduler_url) as client: + _dask_worker_from_ec2_instance(client, worker_ec2_instance) + return True + return False + + +async def list_unrunnable_tasks(url: AnyUrl) -> list[DaskTask]: + """ + Raises: + DaskSchedulerNotFoundError + """ + + def _list_tasks( + dask_scheduler: distributed.Scheduler, + ) -> dict[str, dict[str, Any]]: + return { + task.key: task.resource_restrictions for task in dask_scheduler.unrunnable + } + + async with _scheduler_client(url) as client: + list_of_tasks: dict[ + DaskTaskId, DaskTaskResources + ] = await _wrap_client_async_routine(client.run_on_scheduler(_list_tasks)) + _logger.debug("found unrunnable tasks: %s", list_of_tasks) + return [ + DaskTask(task_id=task_id, required_resources=task_resources) + for task_id, task_resources in list_of_tasks.items() + ] + + +async def list_processing_tasks_per_worker( + url: AnyUrl, +) -> dict[DaskWorkerUrl, list[DaskTask]]: + """ + Raises: + DaskSchedulerNotFoundError + """ + + def _list_processing_tasks( + dask_scheduler: distributed.Scheduler, + ) -> dict[str, list[tuple[DaskTaskId, DaskTaskResources]]]: + worker_to_processing_tasks = defaultdict(list) + for task_key, task_state in dask_scheduler.tasks.items(): + if task_state.processing_on: + worker_to_processing_tasks[task_state.processing_on.address].append( + (task_key, task_state.resource_restrictions) + ) + return worker_to_processing_tasks + + async with _scheduler_client(url) as client: + worker_to_tasks: dict[ + str, list[tuple[DaskTaskId, DaskTaskResources]] + ] = await _wrap_client_async_routine( + client.run_on_scheduler(_list_processing_tasks) + ) + _logger.debug("found processing tasks: %s", worker_to_tasks) + tasks_per_worker = defaultdict(list) + for worker, tasks in worker_to_tasks.items(): + for task_id, required_resources in tasks: + tasks_per_worker[worker].append( + DaskTask(task_id=task_id, required_resources=required_resources) + ) + return tasks_per_worker + + async def get_worker_still_has_results_in_memory( url: AnyUrl, ec2_instance: EC2InstanceData ) -> int: @@ -160,25 +194,26 @@ async def get_worker_used_resources( """ def _get_worker_used_resources( - dask_scheduler: distributed.Scheduler, - ) -> dict[str, dict]: - used_resources = {} + dask_scheduler: distributed.Scheduler, *, worker_url: str + ) -> dict[str, float] | None: for worker_name, worker_state in dask_scheduler.workers.items(): - used_resources[worker_name] = worker_state.used_resources - return used_resources + if worker_url != worker_name: + continue + if worker_state.status is distributed.Status.closing_gracefully: + # NOTE: when a worker was retired it is in this state + return {} + return dict(worker_state.used_resources) + return None async with _scheduler_client(url) as client: worker_url, _ = _dask_worker_from_ec2_instance(client, ec2_instance) # now get the used resources - used_resources_per_worker: dict[ - str, dict[str, Any] - ] = await _wrap_client_async_routine( - client.run_on_scheduler(_get_worker_used_resources) + worker_used_resources: dict[str, Any] | None = await _wrap_client_async_routine( + client.run_on_scheduler(_get_worker_used_resources, worker_url=worker_url), ) - if worker_url not in used_resources_per_worker: + if worker_used_resources is None: raise DaskWorkerNotFoundError(worker_host=worker_url, url=url) - worker_used_resources = used_resources_per_worker[worker_url] return Resources( cpus=worker_used_resources.get("CPU", 0), ram=parse_obj_as(ByteSize, worker_used_resources.get("RAM", 0)), @@ -203,3 +238,10 @@ async def compute_cluster_total_resources( continue return Resources.create_as_empty() + + +async def try_retire_nodes(url: AnyUrl) -> None: + async with _scheduler_client(url) as client: + await _wrap_client_async_routine( + client.retire_workers(close_workers=False, remove=False) + ) diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/remote_debug.py b/services/autoscaling/src/simcore_service_autoscaling/modules/remote_debug.py new file mode 100644 index 00000000000..318f7a11a02 --- /dev/null +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/remote_debug.py @@ -0,0 +1,26 @@ +""" Setup remote debugger with debugpy - a debugger for Python + https://github.com/microsoft/debugpy + +""" +import logging + +from fastapi import FastAPI + +_logger = logging.getLogger(__name__) +_REMOTE_DEBUGGING_PORT = 3000 + + +def setup_remote_debugging(app: FastAPI) -> None: + def on_startup() -> None: + try: + _logger.info("Attaching debugpy on %s...", _REMOTE_DEBUGGING_PORT) + + import debugpy + + debugpy.listen(("0.0.0.0", _REMOTE_DEBUGGING_PORT)) # nosec # noqa: S104 + + except ImportError as err: # pragma: no cover + msg = "Cannot enable remote debugging. Please install debugpy first" + raise RuntimeError(msg) from err + + app.add_event_handler("startup", on_startup) diff --git a/services/autoscaling/src/simcore_service_autoscaling/utils/auto_scaling_core.py b/services/autoscaling/src/simcore_service_autoscaling/utils/auto_scaling_core.py index f0fe674e9db..0cfd72d12f5 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/utils/auto_scaling_core.py +++ b/services/autoscaling/src/simcore_service_autoscaling/utils/auto_scaling_core.py @@ -7,18 +7,13 @@ EC2InstanceBootSpecific, EC2InstanceData, EC2InstanceType, - Resources, ) from models_library.generated_models.docker_rest_api import Node from types_aiobotocore_ec2.literals import InstanceTypeType from ..core.errors import Ec2InstanceInvalidError, Ec2InvalidDnsNameError from ..core.settings import ApplicationSettings -from ..models import ( - AssignedTasksToInstance, - AssignedTasksToInstanceType, - AssociatedInstance, -) +from ..models import AssociatedInstance from ..modules.auto_scaling_mode_base import BaseAutoscaling from . import utils_docker @@ -73,7 +68,9 @@ def _find_node_with_name(node: Node) -> bool: continue if node := next(iter(filter(_find_node_with_name, nodes)), None): - associated_instances.append(AssociatedInstance(node, instance_data)) + associated_instances.append( + AssociatedInstance(node=node, ec2_instance=instance_data) + ) else: non_associated_instances.append(instance_data) return associated_instances, non_associated_instances @@ -115,68 +112,6 @@ def _instance_type_by_type_name( return bool(ec2_type.name == type_name) -def _instance_type_map_by_type_name( - mapping: AssignedTasksToInstanceType, *, type_name: InstanceTypeType | None -) -> bool: - return _instance_type_by_type_name(mapping.instance_type, type_name=type_name) - - -def _instance_data_map_by_type_name( - mapping: AssignedTasksToInstance, *, type_name: InstanceTypeType | None -) -> bool: - if type_name is None: - return True - return bool(mapping.instance.type == type_name) - - -def filter_by_task_defined_instance( - instance_type_name: InstanceTypeType | None, - active_instances_to_tasks: list[AssignedTasksToInstance], - pending_instances_to_tasks: list[AssignedTasksToInstance], - drained_instances_to_tasks: list[AssignedTasksToInstance], - needed_new_instance_types_for_tasks: list[AssignedTasksToInstanceType], -) -> tuple[ - list[AssignedTasksToInstance], - list[AssignedTasksToInstance], - list[AssignedTasksToInstance], - list[AssignedTasksToInstanceType], -]: - return ( - list( - filter( - functools.partial( - _instance_data_map_by_type_name, type_name=instance_type_name - ), - active_instances_to_tasks, - ) - ), - list( - filter( - functools.partial( - _instance_data_map_by_type_name, type_name=instance_type_name - ), - pending_instances_to_tasks, - ) - ), - list( - filter( - functools.partial( - _instance_data_map_by_type_name, type_name=instance_type_name - ), - drained_instances_to_tasks, - ) - ), - list( - filter( - functools.partial( - _instance_type_map_by_type_name, type_name=instance_type_name - ), - needed_new_instance_types_for_tasks, - ) - ), - ) - - def find_selected_instance_type_for_task( instance_type_name: InstanceTypeType, available_ec2_types: list[EC2InstanceType], @@ -202,12 +137,13 @@ def find_selected_instance_type_for_task( selected_instance = filtered_instances[0] # check that the assigned resources and the machine resource fit - if auto_scaling_mode.get_max_resources_from_task(task) > Resources( - cpus=selected_instance.cpus, ram=selected_instance.ram + if ( + auto_scaling_mode.get_task_required_resources(task) + > selected_instance.resources ): msg = ( f"Task {task} requires more resources than the selected instance provides." - f" Asked for {selected_instance}, but task needs {auto_scaling_mode.get_max_resources_from_task(task)}. Please check!" + f" Asked for {selected_instance}, but task needs {auto_scaling_mode.get_task_required_resources(task)}. Please check!" ) raise Ec2InstanceInvalidError(msg=msg) diff --git a/services/autoscaling/src/simcore_service_autoscaling/utils/computational_scaling.py b/services/autoscaling/src/simcore_service_autoscaling/utils/computational_scaling.py index a640ca9b7c8..de13a16f49b 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/utils/computational_scaling.py +++ b/services/autoscaling/src/simcore_service_autoscaling/utils/computational_scaling.py @@ -1,22 +1,12 @@ -import datetime import logging -from collections.abc import Iterable from typing import Final from aws_library.ec2.models import Resources from dask_task_models_library.resource_constraints import ( get_ec2_instance_type_from_resources, ) -from fastapi import FastAPI -from servicelib.utils_formatting import timedelta_as_minute_second -from ..core.settings import get_application_settings -from ..models import ( - AssignedTasksToInstance, - AssignedTasksToInstanceType, - AssociatedInstance, - DaskTask, -) +from ..models import DaskTask _logger = logging.getLogger(__name__) @@ -24,7 +14,7 @@ _DEFAULT_MAX_RAM: Final[int] = 1024 -def get_max_resources_from_dask_task(task: DaskTask) -> Resources: +def resources_from_dask_task(task: DaskTask) -> Resources: return Resources( cpus=task.required_resources.get("CPU", _DEFAULT_MAX_CPU), ram=task.required_resources.get("RAM", _DEFAULT_MAX_RAM), @@ -36,89 +26,3 @@ def get_task_instance_restriction(task: DaskTask) -> str | None: task.required_resources ) return instance_ec2_type - - -def _compute_tasks_needed_resources(tasks: list[DaskTask]) -> Resources: - total = Resources.create_as_empty() - for t in tasks: - total += get_max_resources_from_dask_task(t) - return total - - -def try_assigning_task_to_node( - pending_task: DaskTask, - instance_to_tasks: Iterable[tuple[AssociatedInstance, list[DaskTask]]], -) -> bool: - for instance, node_assigned_tasks in instance_to_tasks: - instance_total_resource = instance.ec2_instance.resources - tasks_needed_resources = _compute_tasks_needed_resources(node_assigned_tasks) - if ( - instance_total_resource - tasks_needed_resources - ) >= get_max_resources_from_dask_task(pending_task): - node_assigned_tasks.append(pending_task) - return True - return False - - -async def try_assigning_task_to_instances( - app: FastAPI, - pending_task: DaskTask, - instances_to_tasks: list[AssignedTasksToInstance], - *, - notify_progress: bool, -) -> bool: - app_settings = get_application_settings(app) - assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec - instance_max_time_to_start = ( - app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_START_TIME - ) - for assigned_tasks_to_instance in instances_to_tasks: - tasks_needed_resources = _compute_tasks_needed_resources( - assigned_tasks_to_instance.assigned_tasks - ) - if ( - assigned_tasks_to_instance.available_resources - tasks_needed_resources - ) >= get_max_resources_from_dask_task(pending_task): - assigned_tasks_to_instance.assigned_tasks.append(pending_task) - if notify_progress: - now = datetime.datetime.now(datetime.timezone.utc) - time_since_launch = ( - now - assigned_tasks_to_instance.instance.launch_time - ) - estimated_time_to_completion = ( - assigned_tasks_to_instance.instance.launch_time - + instance_max_time_to_start - - now - ) - _logger.info( - "LOG: %s", - f"adding machines to the cluster (time waiting: {timedelta_as_minute_second(time_since_launch)}," - f" est. remaining time: {timedelta_as_minute_second(estimated_time_to_completion)})...please wait...", - ) - _logger.info( - "PROGRESS: %s", - time_since_launch.total_seconds() - / instance_max_time_to_start.total_seconds(), - ) - return True - return False - - -def try_assigning_task_to_instance_types( - pending_task: DaskTask, - instance_types_to_tasks: list[AssignedTasksToInstanceType], -) -> bool: - for assigned_tasks_to_instance_type in instance_types_to_tasks: - instance_total_resource = Resources( - cpus=assigned_tasks_to_instance_type.instance_type.cpus, - ram=assigned_tasks_to_instance_type.instance_type.ram, - ) - tasks_needed_resources = _compute_tasks_needed_resources( - assigned_tasks_to_instance_type.assigned_tasks - ) - if ( - instance_total_resource - tasks_needed_resources - ) >= get_max_resources_from_dask_task(pending_task): - assigned_tasks_to_instance_type.assigned_tasks.append(pending_task) - return True - return False diff --git a/services/autoscaling/src/simcore_service_autoscaling/utils/dynamic_scaling.py b/services/autoscaling/src/simcore_service_autoscaling/utils/dynamic_scaling.py deleted file mode 100644 index a0d590727f2..00000000000 --- a/services/autoscaling/src/simcore_service_autoscaling/utils/dynamic_scaling.py +++ /dev/null @@ -1,103 +0,0 @@ -import datetime -import logging -from collections.abc import Iterable - -from aws_library.ec2.models import Resources -from fastapi import FastAPI -from models_library.generated_models.docker_rest_api import Task -from servicelib.utils_formatting import timedelta_as_minute_second - -from ..core.settings import get_application_settings -from ..models import ( - AssignedTasksToInstance, - AssignedTasksToInstanceType, - AssociatedInstance, -) -from . import utils_docker -from .rabbitmq import log_tasks_message, progress_tasks_message - -logger = logging.getLogger(__name__) - - -def try_assigning_task_to_node( - pending_task: Task, - instances_to_tasks: Iterable[tuple[AssociatedInstance, list[Task]]], -) -> bool: - for instance, node_assigned_tasks in instances_to_tasks: - instance_total_resource = instance.ec2_instance.resources - tasks_needed_resources = utils_docker.compute_tasks_needed_resources( - node_assigned_tasks - ) - if ( - instance_total_resource - tasks_needed_resources - ) >= utils_docker.get_max_resources_from_docker_task(pending_task): - node_assigned_tasks.append(pending_task) - return True - return False - - -def try_assigning_task_to_instance_types( - pending_task: Task, - instance_types_to_tasks: list[AssignedTasksToInstanceType], -) -> bool: - for assigned_tasks_to_instance_type in instance_types_to_tasks: - instance_total_resource = Resources( - cpus=assigned_tasks_to_instance_type.instance_type.cpus, - ram=assigned_tasks_to_instance_type.instance_type.ram, - ) - tasks_needed_resources = utils_docker.compute_tasks_needed_resources( - assigned_tasks_to_instance_type.assigned_tasks - ) - if ( - instance_total_resource - tasks_needed_resources - ) >= utils_docker.get_max_resources_from_docker_task(pending_task): - assigned_tasks_to_instance_type.assigned_tasks.append(pending_task) - return True - return False - - -async def try_assigning_task_to_instances( - app: FastAPI, - pending_task: Task, - instances_to_tasks: list[AssignedTasksToInstance], - *, - notify_progress: bool, -) -> bool: - app_settings = get_application_settings(app) - assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec - instance_max_time_to_start = ( - app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_START_TIME - ) - for assigned_tasks_to_instance in instances_to_tasks: - tasks_needed_resources = utils_docker.compute_tasks_needed_resources( - assigned_tasks_to_instance.assigned_tasks - ) - if ( - assigned_tasks_to_instance.available_resources - tasks_needed_resources - ) >= utils_docker.get_max_resources_from_docker_task(pending_task): - assigned_tasks_to_instance.assigned_tasks.append(pending_task) - if notify_progress: - now = datetime.datetime.now(datetime.timezone.utc) - time_since_launch = ( - now - assigned_tasks_to_instance.instance.launch_time - ) - estimated_time_to_completion = ( - assigned_tasks_to_instance.instance.launch_time - + instance_max_time_to_start - - now - ) - - await log_tasks_message( - app, - [pending_task], - f"adding machines to the cluster (time waiting: {timedelta_as_minute_second(time_since_launch)}, " - f"est. remaining time: {timedelta_as_minute_second(estimated_time_to_completion)})...please wait...", - ) - await progress_tasks_message( - app, - [pending_task], - time_since_launch.total_seconds() - / instance_max_time_to_start.total_seconds(), - ) - return True - return False diff --git a/services/autoscaling/src/simcore_service_autoscaling/utils/utils_docker.py b/services/autoscaling/src/simcore_service_autoscaling/utils/utils_docker.py index 42c8e3376f1..d5d8d4f2cfe 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/utils/utils_docker.py +++ b/services/autoscaling/src/simcore_service_autoscaling/utils/utils_docker.py @@ -20,6 +20,7 @@ DockerLabelKey, ) from models_library.generated_models.docker_rest_api import ( + Availability, Node, NodeState, Service, @@ -425,7 +426,7 @@ def get_docker_pull_images_on_start_bash_command( write_docker_compose_pull_script_cmd = " ".join( [ "echo", - f'"#!/bin/sh\necho Pulling started at \\$(date)\n{_DOCKER_COMPOSE_CMD} --file={_PRE_PULL_COMPOSE_PATH} pull --ignore-pull-failures"', + f'"#!/bin/sh\necho Pulling started at \\$(date)\n{_DOCKER_COMPOSE_CMD} --project-name=autoscaleprepull --file={_PRE_PULL_COMPOSE_PATH} pull --ignore-pull-failures"', ">", f"{_DOCKER_COMPOSE_PULL_SCRIPT_PATH}", ] @@ -537,3 +538,11 @@ def get__new_node_docker_tags( } | {DOCKER_TASK_EC2_INSTANCE_TYPE_PLACEMENT_CONSTRAINT_KEY: ec2_instance.type} ) + + +def is_node_ready_and_available(node: Node, availability: Availability) -> bool: + assert node.Status # nosec + assert node.Spec # nosec + return bool( + node.Status.State == NodeState.ready and node.Spec.Availability == availability + ) diff --git a/services/autoscaling/src/simcore_service_autoscaling/utils/utils_ec2.py b/services/autoscaling/src/simcore_service_autoscaling/utils/utils_ec2.py index afb4d224311..23475994622 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/utils/utils_ec2.py +++ b/services/autoscaling/src/simcore_service_autoscaling/utils/utils_ec2.py @@ -73,12 +73,21 @@ def closest_instance_policy( ec2_instance: EC2InstanceType, resources: Resources, ) -> float: - if ec2_instance.cpus < resources.cpus or ec2_instance.ram < resources.ram: + if ( + ec2_instance.resources.cpus < resources.cpus + or ec2_instance.resources.ram < resources.ram + ): return 0 # compute a score for all the instances that are above expectations # best is the exact ec2 instance - cpu_ratio = float(ec2_instance.cpus - resources.cpus) / float(ec2_instance.cpus) - ram_ratio = float(ec2_instance.ram - resources.ram) / float(ec2_instance.ram) + assert ec2_instance.resources.cpus > 0 # nosec + assert ec2_instance.resources.ram > 0 # nosec + cpu_ratio = float(ec2_instance.resources.cpus - resources.cpus) / float( + ec2_instance.resources.cpus + ) + ram_ratio = float(ec2_instance.resources.ram - resources.ram) / float( + ec2_instance.resources.ram + ) return 100 * (1.0 - cpu_ratio) * (1.0 - ram_ratio) diff --git a/services/autoscaling/tests/manual/.env-devel b/services/autoscaling/tests/manual/.env-devel index a56c8ab125b..f0c4a27eac4 100644 --- a/services/autoscaling/tests/manual/.env-devel +++ b/services/autoscaling/tests/manual/.env-devel @@ -13,6 +13,7 @@ EC2_INSTANCES_KEY_NAME=XXXXXXXXXX EC2_INSTANCES_NAME_PREFIX=testing-osparc-computational-cluster EC2_INSTANCES_SECURITY_GROUP_IDS="[\"XXXXXXXXXX\"]" EC2_INSTANCES_SUBNET_ID=XXXXXXXXXX +EC2_INSTANCES_CUSTOM_TAGS='{"special": "testing"}' LOG_FORMAT_LOCAL_DEV_ENABLED=True # define the following to activate dynamic autoscaling # NODES_MONITORING_NEW_NODES_LABELS="[\"testing.autoscaled-node\"]" diff --git a/services/autoscaling/tests/manual/README.md b/services/autoscaling/tests/manual/README.md index baa8c88aa94..b7a2f175009 100644 --- a/services/autoscaling/tests/manual/README.md +++ b/services/autoscaling/tests/manual/README.md @@ -9,6 +9,7 @@ The dynamic mode is used directly with docker swarm facilities. 1. AWS EC2 access 2. a machine running in EC2 with docker installed and access to osparc-simcore repository (for example t2.xlarge to have some computational power) +3. Note that VScode remote can be used to directly code on the EC2 instance. ## computational mode diff --git a/services/autoscaling/tests/manual/dask-manual-tester.ipynb b/services/autoscaling/tests/manual/dask-manual-tester.ipynb new file mode 100644 index 00000000000..362b6326b89 --- /dev/null +++ b/services/autoscaling/tests/manual/dask-manual-tester.ipynb @@ -0,0 +1,85 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import distributed" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "client = distributed.Client(\"tcp://XXXXXXXXXXX:8786\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def square(x):\n", + " import time\n", + " time.sleep(15)\n", + " return x ** 2" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "A = client.map(square, range(500), resources={\"CPU\": 1}, pure=False)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "del A\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def fct(dask_scheduler: distributed.Scheduler):\n", + " return f\"{dask_scheduler.workers}\"\n", + " return f\"{dask_scheduler.workers_to_close()}\"\n", + "print(client.run_on_scheduler(fct))" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.10" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/services/autoscaling/tests/manual/docker-compose-computational.yml b/services/autoscaling/tests/manual/docker-compose-computational.yml index ed3aca7d163..462e3990476 100644 --- a/services/autoscaling/tests/manual/docker-compose-computational.yml +++ b/services/autoscaling/tests/manual/docker-compose-computational.yml @@ -40,6 +40,10 @@ services: ports: - 8786:8786 - 8787:8787 + deploy: + placement: + constraints: + - "node.role==manager" volumes: computational_shared_data: diff --git a/services/autoscaling/tests/manual/docker-compose.yml b/services/autoscaling/tests/manual/docker-compose.yml index 7dbcf5bd4cc..3c409170531 100644 --- a/services/autoscaling/tests/manual/docker-compose.yml +++ b/services/autoscaling/tests/manual/docker-compose.yml @@ -5,7 +5,7 @@ services: init: true hostname: "{{.Node.Hostname}}-{{.Service.Name}}-{{.Task.Slot}}" ports: - - "5672:5672" + # - "5672:5672" - "15672:15672" - "15692" environment: @@ -23,8 +23,8 @@ services: image: "redis:6.2.6@sha256:4bed291aa5efb9f0d77b76ff7d4ab71eee410962965d052552db1fb80576431d" init: true hostname: "{{.Node.Hostname}}-{{.Service.Name}}-{{.Task.Slot}}" - ports: - - "6379:6379" + # ports: + # - "6379:6379" healthcheck: test: [ "CMD", "redis-cli", "ping" ] interval: 5s @@ -33,24 +33,6 @@ services: volumes: - redis-data:/data - redis-commander: - image: rediscommander/redis-commander:latest - init: true - hostname: "{{.Node.Hostname}}-{{.Service.Name}}-{{.Task.Slot}}" - ports: - - "18081:8081" - environment: - - >- - REDIS_HOSTS= - resources:${REDIS_HOST}:${REDIS_PORT}:0, - locks:${REDIS_HOST}:${REDIS_PORT}:1, - validation_codes:${REDIS_HOST}:${REDIS_PORT}:2, - scheduled_maintenance:${REDIS_HOST}:${REDIS_PORT}:3, - user_notifications:${REDIS_HOST}:${REDIS_PORT}:4, - announcements:${REDIS_HOST}:${REDIS_PORT}:5, - distributed_identifiers:${REDIS_HOST}:${REDIS_PORT}:6 - # If you add/remove a db, do not forget to update the --databases entry in the docker-compose.yml - autoscaling: image: local/autoscaling:development dns: 8.8.8.8 # needed to access internet @@ -65,5 +47,9 @@ services: - "/var/run/docker.sock:/var/run/docker.sock" - ../../:/devel/services/autoscaling - ../../../../packages:/devel/packages + deploy: + placement: + constraints: + - "node.role==manager" volumes: redis-data: diff --git a/services/autoscaling/tests/unit/conftest.py b/services/autoscaling/tests/unit/conftest.py index c13c9be2f16..ca0f36c640a 100644 --- a/services/autoscaling/tests/unit/conftest.py +++ b/services/autoscaling/tests/unit/conftest.py @@ -595,6 +595,7 @@ def _creator(**cluter_overrides) -> Cluster: return dataclasses.replace( Cluster( active_nodes=[], + pending_nodes=[], drained_nodes=[], reserve_drained_nodes=[], pending_ec2s=[], diff --git a/services/autoscaling/tests/unit/test_modules_auto_scaling_computational.py b/services/autoscaling/tests/unit/test_modules_auto_scaling_computational.py index f909caab521..641d2792321 100644 --- a/services/autoscaling/tests/unit/test_modules_auto_scaling_computational.py +++ b/services/autoscaling/tests/unit/test_modules_auto_scaling_computational.py @@ -12,7 +12,6 @@ import logging from collections import defaultdict from collections.abc import Callable, Iterator -from copy import deepcopy from dataclasses import dataclass from typing import Any from unittest import mock @@ -34,15 +33,8 @@ from pytest_mock import MockerFixture from pytest_simcore.helpers.utils_envs import EnvVarsDict, setenvs_from_dict from simcore_service_autoscaling.core.settings import ApplicationSettings -from simcore_service_autoscaling.models import ( - AssociatedInstance, - Cluster, - EC2InstanceData, -) -from simcore_service_autoscaling.modules.auto_scaling_core import ( - _deactivate_empty_nodes, - auto_scale_cluster, -) +from simcore_service_autoscaling.models import EC2InstanceData +from simcore_service_autoscaling.modules.auto_scaling_core import auto_scale_cluster from simcore_service_autoscaling.modules.auto_scaling_mode_computational import ( ComputationalAutoscaling, ) @@ -312,6 +304,15 @@ def mock_dask_get_worker_used_resources(mocker: MockerFixture) -> mock.Mock: ) +@pytest.fixture +def mock_dask_is_worker_connected(mocker: MockerFixture) -> mock.Mock: + return mocker.patch( + "simcore_service_autoscaling.modules.dask.is_worker_connected", + return_value=True, + autospec=True, + ) + + async def _create_task_with_resources( ec2_client: EC2Client, dask_task_imposed_ec2_type: InstanceTypeType | None, @@ -384,6 +385,7 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 mock_docker_compute_node_used_resources: mock.Mock, mock_dask_get_worker_has_results_in_memory: mock.Mock, mock_dask_get_worker_used_resources: mock.Mock, + mock_dask_is_worker_connected: mock.Mock, mocker: MockerFixture, dask_spec_local_cluster: distributed.SpecCluster, create_dask_task_resources: Callable[..., DaskTaskResources], @@ -425,6 +427,7 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 mock_docker_compute_node_used_resources.assert_not_called() mock_dask_get_worker_has_results_in_memory.assert_not_called() mock_dask_get_worker_used_resources.assert_not_called() + mock_dask_is_worker_connected.assert_not_called() # check rabbit messages were sent _assert_rabbit_autoscaling_message_sent( mock_rabbitmq_post_message, @@ -444,6 +447,7 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 mock_dask_get_worker_has_results_in_memory.reset_mock() mock_dask_get_worker_used_resources.assert_called_once() mock_dask_get_worker_used_resources.reset_mock() + mock_dask_is_worker_connected.assert_not_called() internal_dns_names = await _assert_ec2_instances( ec2_client, num_reservations=1, @@ -494,6 +498,9 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 await auto_scale_cluster( app=initialized_app, auto_scaling_mode=auto_scaling_mode ) + mock_dask_is_worker_connected.assert_called() + assert mock_dask_is_worker_connected.call_count == num_useless_calls + mock_dask_is_worker_connected.reset_mock() mock_dask_get_worker_has_results_in_memory.assert_called() assert ( mock_dask_get_worker_has_results_in_memory.call_count == 2 * num_useless_calls @@ -525,6 +532,8 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 del dask_future await auto_scale_cluster(app=initialized_app, auto_scaling_mode=auto_scaling_mode) + mock_dask_is_worker_connected.assert_called_once() + mock_dask_is_worker_connected.reset_mock() mock_dask_get_worker_has_results_in_memory.assert_called() assert mock_dask_get_worker_has_results_in_memory.call_count == 2 mock_dask_get_worker_has_results_in_memory.reset_mock() @@ -925,69 +934,3 @@ async def test_cluster_scaling_up_more_than_allowed_with_multiple_types_max_star assert len(all_instances["Reservations"]) == len( aws_allowed_ec2_instance_type_names ) - - -@pytest.fixture -def fake_associated_host_instance( - host_node: DockerNode, - fake_localhost_ec2_instance_data: EC2InstanceData, -) -> AssociatedInstance: - return AssociatedInstance( - host_node, - fake_localhost_ec2_instance_data, - ) - - -async def test__deactivate_empty_nodes( - minimal_configuration: None, - initialized_app: FastAPI, - cluster: Callable[..., Cluster], - host_node: DockerNode, - fake_associated_host_instance: AssociatedInstance, - mock_docker_set_node_availability: mock.Mock, -): - # since we have no service running, we expect the passed node to be set to drain - active_cluster = cluster(active_nodes=[fake_associated_host_instance]) - updated_cluster = await _deactivate_empty_nodes( - initialized_app, active_cluster, ComputationalAutoscaling() - ) - assert not updated_cluster.active_nodes - assert len(updated_cluster.drained_nodes) == len(active_cluster.active_nodes) - mock_docker_set_node_availability.assert_called_once_with( - mock.ANY, host_node, available=False - ) - - -async def test__deactivate_empty_nodes_with_finished_tasks_should_not_deactivate_until_tasks_are_retrieved( - minimal_configuration: None, - initialized_app: FastAPI, - cluster: Callable[..., Cluster], - host_node: DockerNode, - fake_associated_host_instance: AssociatedInstance, - mock_docker_set_node_availability: mock.Mock, - create_dask_task: Callable[[DaskTaskResources], distributed.Future], -): - dask_future = create_dask_task({}) - assert dask_future - # NOTE: this sucks, but it seems that as soon as we use any method of the future it returns the data to the caller - await asyncio.sleep(4) - # since we have result still in memory, the node shall remain active - active_cluster = cluster(active_nodes=[fake_associated_host_instance]) - - updated_cluster = await _deactivate_empty_nodes( - initialized_app, deepcopy(active_cluster), ComputationalAutoscaling() - ) - assert updated_cluster.active_nodes - mock_docker_set_node_availability.assert_not_called() - - # now removing the dask_future shall remove the result from the memory - del dask_future - await asyncio.sleep(4) - updated_cluster = await _deactivate_empty_nodes( - initialized_app, deepcopy(active_cluster), ComputationalAutoscaling() - ) - assert not updated_cluster.active_nodes - assert len(updated_cluster.drained_nodes) == len(active_cluster.active_nodes) - mock_docker_set_node_availability.assert_called_once_with( - mock.ANY, host_node, available=False - ) diff --git a/services/autoscaling/tests/unit/test_modules_auto_scaling_dynamic.py b/services/autoscaling/tests/unit/test_modules_auto_scaling_dynamic.py index ef1d31de6f8..6b11152ab09 100644 --- a/services/autoscaling/tests/unit/test_modules_auto_scaling_dynamic.py +++ b/services/autoscaling/tests/unit/test_modules_auto_scaling_dynamic.py @@ -22,6 +22,7 @@ from models_library.docker import ( DOCKER_TASK_EC2_INSTANCE_TYPE_PLACEMENT_CONSTRAINT_KEY, DockerLabelKey, + StandardSimcoreDockerLabels, ) from models_library.generated_models.docker_rest_api import ( Availability, @@ -370,7 +371,7 @@ async def _assert_ec2_instances( assert "Instances" in reservation assert ( len(reservation["Instances"]) == num_instances - ), f"created {num_instances} instances of {reservation['Instances'][0]['InstanceType'] if num_instances > 0 else 'n/a'}" + ), f"expected {num_instances}, found {len(reservation['Instances'])}" for instance in reservation["Instances"]: assert "InstanceType" in instance assert instance["InstanceType"] == instance_type @@ -721,6 +722,7 @@ class _ScaleUpParams: async def test_cluster_scaling_up_starts_multiple_instances( minimal_configuration: None, service_monitored_labels: dict[DockerLabelKey, str], + osparc_docker_label_keys: StandardSimcoreDockerLabels, app_settings: ApplicationSettings, initialized_app: FastAPI, create_service: Callable[ @@ -748,7 +750,8 @@ async def test_cluster_scaling_up_starts_multiple_instances( int(scale_up_params.service_resources.cpus), scale_up_params.service_resources.ram, ), - service_monitored_labels, + service_monitored_labels + | osparc_docker_label_keys.to_simcore_runtime_docker_labels(), "pending", [ f"node.labels.{DOCKER_TASK_EC2_INSTANCE_TYPE_PLACEMENT_CONSTRAINT_KEY}=={scale_up_params.imposed_instance_type}" @@ -798,11 +801,11 @@ async def test__deactivate_empty_nodes( ): # since we have no service running, we expect the passed node to be set to drain active_cluster = cluster( - active_nodes=[AssociatedInstance(host_node, fake_ec2_instance_data())] - ) - updated_cluster = await _deactivate_empty_nodes( - initialized_app, active_cluster, DynamicAutoscaling() + active_nodes=[ + AssociatedInstance(node=host_node, ec2_instance=fake_ec2_instance_data()) + ] ) + updated_cluster = await _deactivate_empty_nodes(initialized_app, active_cluster) assert not updated_cluster.active_nodes assert len(updated_cluster.drained_nodes) == len(active_cluster.active_nodes) mock_docker_set_node_availability.assert_called_once_with( @@ -834,11 +837,11 @@ async def test__deactivate_empty_nodes_to_drain_when_services_running_are_missin "running", ) active_cluster = cluster( - active_nodes=[AssociatedInstance(host_node, fake_ec2_instance_data())] - ) - updated_cluster = await _deactivate_empty_nodes( - initialized_app, active_cluster, DynamicAutoscaling() + active_nodes=[ + AssociatedInstance(node=host_node, ec2_instance=fake_ec2_instance_data()) + ] ) + updated_cluster = await _deactivate_empty_nodes(initialized_app, active_cluster) assert not updated_cluster.active_nodes assert len(updated_cluster.drained_nodes) == len(active_cluster.active_nodes) mock_docker_set_node_availability.assert_called_once_with( @@ -874,12 +877,34 @@ async def test__deactivate_empty_nodes_does_not_drain_if_service_is_running_with ) # since we have no service running, we expect the passed node to be set to drain - active_cluster = cluster( - active_nodes=[AssociatedInstance(host_node, fake_ec2_instance_data())] + assert host_node.Description + assert host_node.Description.Resources + assert host_node.Description.Resources.NanoCPUs + host_node_resources = Resources.parse_obj( + { + "ram": host_node.Description.Resources.MemoryBytes, + "cpus": host_node.Description.Resources.NanoCPUs / 10**9, + } ) - updated_cluster = await _deactivate_empty_nodes( - initialized_app, active_cluster, DynamicAutoscaling() + fake_ec2_instance = fake_ec2_instance_data(resources=host_node_resources) + fake_associated_instance = AssociatedInstance( + node=host_node, ec2_instance=fake_ec2_instance + ) + node_used_resources = await DynamicAutoscaling().compute_node_used_resources( + initialized_app, fake_associated_instance + ) + assert node_used_resources + + active_cluster = cluster( + active_nodes=[ + AssociatedInstance( + node=host_node, + ec2_instance=fake_ec2_instance, + available_resources=host_node_resources - node_used_resources, + ) + ] ) + updated_cluster = await _deactivate_empty_nodes(initialized_app, active_cluster) assert updated_cluster == active_cluster mock_docker_set_node_availability.assert_not_called() @@ -893,9 +918,13 @@ async def test__find_terminateable_nodes_with_no_hosts( ): # there is no node to terminate here since nothing is drained active_cluster = cluster( - active_nodes=[AssociatedInstance(host_node, fake_ec2_instance_data())], + active_nodes=[ + AssociatedInstance(node=host_node, ec2_instance=fake_ec2_instance_data()) + ], drained_nodes=[], - reserve_drained_nodes=[AssociatedInstance(host_node, fake_ec2_instance_data())], + reserve_drained_nodes=[ + AssociatedInstance(node=host_node, ec2_instance=fake_ec2_instance_data()) + ], ) assert await _find_terminateable_instances(initialized_app, active_cluster) == [] @@ -921,8 +950,8 @@ def _creator(node: Node, terminateable_time: bool) -> AssociatedInstance: else datetime.timedelta(seconds=10) ) return AssociatedInstance( - node, - fake_ec2_instance_data( + node=node, + ec2_instance=fake_ec2_instance_data( launch_time=datetime.datetime.now(datetime.timezone.utc) - app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_TERMINATION - datetime.timedelta( @@ -973,10 +1002,9 @@ async def test__activate_drained_nodes_with_no_tasks( ): # no tasks, does nothing and returns True empty_cluster = cluster() - still_pending_tasks, updated_cluster = await _activate_drained_nodes( - initialized_app, empty_cluster, [], DynamicAutoscaling() + updated_cluster = await _activate_drained_nodes( + initialized_app, empty_cluster, DynamicAutoscaling() ) - assert not still_pending_tasks assert updated_cluster == empty_cluster active_cluster = cluster( @@ -988,10 +1016,9 @@ async def test__activate_drained_nodes_with_no_tasks( create_associated_instance(drained_host_node, True) # noqa: FBT003 ], ) - still_pending_tasks, updated_cluster = await _activate_drained_nodes( - initialized_app, active_cluster, [], DynamicAutoscaling() + updated_cluster = await _activate_drained_nodes( + initialized_app, active_cluster, DynamicAutoscaling() ) - assert not still_pending_tasks assert updated_cluster == active_cluster mock_tag_node.assert_not_called() @@ -1031,13 +1058,9 @@ async def test__activate_drained_nodes_with_no_drained_nodes( cluster_without_drained_nodes = cluster( active_nodes=[create_associated_instance(host_node, True)] # noqa: FBT003 ) - still_pending_tasks, updated_cluster = await _activate_drained_nodes( - initialized_app, - cluster_without_drained_nodes, - service_tasks, - DynamicAutoscaling(), + updated_cluster = await _activate_drained_nodes( + initialized_app, cluster_without_drained_nodes, DynamicAutoscaling() ) - assert still_pending_tasks == service_tasks assert updated_cluster == cluster_without_drained_nodes mock_tag_node.assert_not_called() @@ -1080,11 +1103,13 @@ async def test__activate_drained_nodes_with_drained_node( create_associated_instance(drained_host_node, True) # noqa: FBT003 ] ) + cluster_with_drained_nodes.drained_nodes[0].assign_task( + service_tasks[0], Resources(cpus=int(host_cpu_count / 2 + 1), ram=ByteSize(0)) + ) - still_pending_tasks, updated_cluster = await _activate_drained_nodes( - initialized_app, cluster_with_drained_nodes, service_tasks, DynamicAutoscaling() + updated_cluster = await _activate_drained_nodes( + initialized_app, cluster_with_drained_nodes, DynamicAutoscaling() ) - assert not still_pending_tasks assert updated_cluster.active_nodes == cluster_with_drained_nodes.drained_nodes assert drained_host_node.Spec mock_tag_node.assert_called_once_with( diff --git a/services/autoscaling/tests/unit/test_modules_dask.py b/services/autoscaling/tests/unit/test_modules_dask.py index 09b23f84b33..4b30887cf0e 100644 --- a/services/autoscaling/tests/unit/test_modules_dask.py +++ b/services/autoscaling/tests/unit/test_modules_dask.py @@ -9,9 +9,11 @@ import distributed import pytest +from arrow import utcnow from aws_library.ec2.models import Resources from faker import Faker from pydantic import AnyUrl, ByteSize, parse_obj_as +from pytest_simcore.helpers.utils_host import get_localhost_ip from simcore_service_autoscaling.core.errors import ( DaskNoWorkersError, DaskSchedulerNotFoundError, @@ -28,7 +30,7 @@ _scheduler_client, get_worker_still_has_results_in_memory, get_worker_used_resources, - list_processing_tasks, + list_processing_tasks_per_worker, list_unrunnable_tasks, ) from tenacity import retry, stop_after_delay, wait_fixed @@ -51,11 +53,12 @@ def scheduler_url(dask_spec_local_cluster: distributed.SpecCluster) -> AnyUrl: def dask_workers_config() -> dict[str, Any]: # NOTE: override of pytest-simcore dask_workers_config to have only 1 worker return { - "single-cpu-worker": { + "single-cpu_worker": { "cls": distributed.Worker, "options": { "nthreads": 2, "resources": {"CPU": 2, "RAM": 48e9}, + "name": f"dask-sidecar_ip-{get_localhost_ip().replace('.', '-')}_{utcnow()}", }, } } @@ -107,21 +110,23 @@ def _add_fct(x: int, y: int) -> int: return x + y # there is nothing now - assert await list_processing_tasks(url=scheduler_url) == [] + assert await list_processing_tasks_per_worker(url=scheduler_url) == {} # this function will be queued and executed as there are no specific resources needed future_queued_task = dask_spec_cluster_client.submit(_add_fct, 2, 5) assert future_queued_task - assert await list_processing_tasks(scheduler_url) == [ - DaskTaskId(future_queued_task.key) - ] + assert await list_processing_tasks_per_worker(scheduler_url) == { + next(iter(dask_spec_cluster_client.scheduler_info()["workers"])): [ + DaskTask(task_id=DaskTaskId(future_queued_task.key), required_resources={}) + ] + } result = await future_queued_task.result(timeout=_REMOTE_FCT_SLEEP_TIME_S + 4) # type: ignore assert result == 7 # nothing processing anymore - assert await list_processing_tasks(url=scheduler_url) == [] + assert await list_processing_tasks_per_worker(url=scheduler_url) == {} _DASK_SCHEDULER_REACTION_TIME_S: Final[int] = 4 diff --git a/services/autoscaling/tests/unit/test_utils_computational_scaling.py b/services/autoscaling/tests/unit/test_utils_computational_scaling.py index fa06a5a18b8..7341c9d470e 100644 --- a/services/autoscaling/tests/unit/test_utils_computational_scaling.py +++ b/services/autoscaling/tests/unit/test_utils_computational_scaling.py @@ -3,31 +3,15 @@ # pylint: disable=unused-variable # pylint: disable=too-many-arguments -import datetime -from collections.abc import Callable -from unittest import mock import pytest -from aws_library.ec2.models import EC2InstanceType, Resources -from faker import Faker -from models_library.generated_models.docker_rest_api import Node as DockerNode +from aws_library.ec2.models import Resources from pydantic import ByteSize, parse_obj_as -from pytest_mock import MockerFixture -from simcore_service_autoscaling.models import ( - AssignedTasksToInstance, - AssignedTasksToInstanceType, - AssociatedInstance, - DaskTask, - DaskTaskResources, - EC2InstanceData, -) +from simcore_service_autoscaling.models import DaskTask, DaskTaskResources from simcore_service_autoscaling.utils.computational_scaling import ( _DEFAULT_MAX_CPU, _DEFAULT_MAX_RAM, - get_max_resources_from_dask_task, - try_assigning_task_to_instance_types, - try_assigning_task_to_instances, - try_assigning_task_to_node, + resources_from_dask_task, ) @@ -64,163 +48,5 @@ ), ], ) -def test_get_max_resources_from_dask_task( - dask_task: DaskTask, expected_resource: Resources -): - assert get_max_resources_from_dask_task(dask_task) == expected_resource - - -@pytest.fixture -def fake_app(mocker: MockerFixture) -> mock.Mock: - app = mocker.Mock() - app.state.settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_START_TIME = ( - datetime.timedelta(minutes=1) - ) - return app - - -@pytest.fixture -def fake_task(faker: Faker) -> Callable[..., DaskTask]: - def _creator(**overrides) -> DaskTask: - return DaskTask( - **( - { - "task_id": faker.uuid4(), - "required_resources": DaskTaskResources(faker.pydict()), - } - | overrides - ) - ) - - return _creator - - -async def test_try_assigning_task_to_node_with_no_instances( - fake_task: Callable[..., DaskTask], -): - task = fake_task() - assert try_assigning_task_to_node(task, []) is False - - -@pytest.fixture -def fake_associated_host_instance( - host_node: DockerNode, - fake_ec2_instance_data: Callable[..., EC2InstanceData], -) -> AssociatedInstance: - return AssociatedInstance( - host_node, - fake_ec2_instance_data(), - ) - - -async def test_try_assigning_task_to_node( - fake_task: Callable[..., DaskTask], - fake_associated_host_instance: AssociatedInstance, -): - task = fake_task(required_resources={"CPU": 2}) - assert fake_associated_host_instance.node.Description - assert fake_associated_host_instance.node.Description.Resources - # we set the node to have 4 CPUs - fake_associated_host_instance.node.Description.Resources.NanoCPUs = int(4e9) - instance_to_tasks: list[tuple[AssociatedInstance, list[DaskTask]]] = [ - (fake_associated_host_instance, []) - ] - assert try_assigning_task_to_node(task, instance_to_tasks) is True - assert instance_to_tasks[0][1] == [task] - # this should work again - assert try_assigning_task_to_node(task, instance_to_tasks) is True - assert instance_to_tasks[0][1] == [task, task] - # this should now fail - assert try_assigning_task_to_node(task, instance_to_tasks) is False - assert instance_to_tasks[0][1] == [task, task] - - -async def test_try_assigning_task_to_instances_with_no_instances( - fake_app: mock.Mock, - fake_task: Callable[..., DaskTask], -): - task = fake_task() - assert ( - await try_assigning_task_to_instances(fake_app, task, [], notify_progress=True) - is False - ) - - -async def test_try_assigning_task_to_instances( - fake_app: mock.Mock, - fake_task: Callable[..., DaskTask], - fake_ec2_instance_data: Callable[..., EC2InstanceData], -): - task = fake_task(required_resources={"CPU": 2}) - ec2_instance = fake_ec2_instance_data() - pending_instance_to_tasks: list[AssignedTasksToInstance] = [ - AssignedTasksToInstance( - instance=ec2_instance, - assigned_tasks=[], - available_resources=Resources(cpus=4, ram=ByteSize(1024**2)), - ) - ] - - # calling once should allow to add that task to the instance - assert ( - await try_assigning_task_to_instances( - fake_app, - task, - pending_instance_to_tasks, - notify_progress=True, - ) - is True - ) - assert pending_instance_to_tasks[0].assigned_tasks == [task] - # calling a second time as well should allow to add that task to the instance - assert ( - await try_assigning_task_to_instances( - fake_app, - task, - pending_instance_to_tasks, - notify_progress=True, - ) - is True - ) - assert pending_instance_to_tasks[0].assigned_tasks == [task, task] - # calling a third time should fail - assert ( - await try_assigning_task_to_instances( - fake_app, - task, - pending_instance_to_tasks, - notify_progress=True, - ) - is False - ) - assert pending_instance_to_tasks[0].assigned_tasks == [task, task] - - -def test_try_assigning_task_to_instance_types_with_empty_types( - fake_task: Callable[..., DaskTask] -): - task = fake_task(required_resources={"CPU": 2}) - assert try_assigning_task_to_instance_types(task, []) is False - - -def test_try_assigning_task_to_instance_types( - fake_task: Callable[..., DaskTask], faker: Faker -): - task = fake_task(required_resources={"CPU": 2}) - # create an instance type with some CPUs - fake_instance_type = EC2InstanceType( - name=faker.name(), cpus=6, ram=parse_obj_as(ByteSize, "2GiB") - ) - instance_type_to_tasks: list[AssignedTasksToInstanceType] = [ - AssignedTasksToInstanceType(instance_type=fake_instance_type, assigned_tasks=[]) - ] - # now this should work 3 times - assert try_assigning_task_to_instance_types(task, instance_type_to_tasks) is True - assert instance_type_to_tasks[0].assigned_tasks == [task] - assert try_assigning_task_to_instance_types(task, instance_type_to_tasks) is True - assert instance_type_to_tasks[0].assigned_tasks == [task, task] - assert try_assigning_task_to_instance_types(task, instance_type_to_tasks) is True - assert instance_type_to_tasks[0].assigned_tasks == [task, task, task] - # now it should fail - assert try_assigning_task_to_instance_types(task, instance_type_to_tasks) is False - assert instance_type_to_tasks[0].assigned_tasks == [task, task, task] +def test_resources_from_dask_task(dask_task: DaskTask, expected_resource: Resources): + assert resources_from_dask_task(dask_task) == expected_resource diff --git a/services/autoscaling/tests/unit/test_utils_docker.py b/services/autoscaling/tests/unit/test_utils_docker.py index 7d3dc9cef95..fc0f3d90e8f 100644 --- a/services/autoscaling/tests/unit/test_utils_docker.py +++ b/services/autoscaling/tests/unit/test_utils_docker.py @@ -899,7 +899,7 @@ async def test_tag_node_out_of_sequence_error( 'image: itisfoundation/simcore/services/dynamic/service:23.5.5\nversion: \'"3.8"\'\n"' " > /docker-pull.compose.yml" " && " - 'echo "#!/bin/sh\necho Pulling started at \\$(date)\ndocker compose --file=/docker-pull.compose.yml pull --ignore-pull-failures" > /docker-pull-script.sh' + 'echo "#!/bin/sh\necho Pulling started at \\$(date)\ndocker compose --project-name=autoscaleprepull --file=/docker-pull.compose.yml pull --ignore-pull-failures" > /docker-pull-script.sh' " && " "chmod +x /docker-pull-script.sh" " && " diff --git a/services/autoscaling/tests/unit/test_utils_dynamic_scaling.py b/services/autoscaling/tests/unit/test_utils_dynamic_scaling.py deleted file mode 100644 index ab3b3c55e3f..00000000000 --- a/services/autoscaling/tests/unit/test_utils_dynamic_scaling.py +++ /dev/null @@ -1,96 +0,0 @@ -# pylint: disable=redefined-outer-name -# pylint: disable=unused-argument -# pylint: disable=unused-variable -# pylint: disable=too-many-arguments - - -from collections.abc import Callable -from datetime import timedelta - -import pytest -from aws_library.ec2.models import EC2InstanceData, Resources -from faker import Faker -from models_library.generated_models.docker_rest_api import Task -from pydantic import ByteSize -from pytest_mock import MockerFixture -from simcore_service_autoscaling.models import AssignedTasksToInstance -from simcore_service_autoscaling.utils.dynamic_scaling import ( - try_assigning_task_to_instances, -) - - -@pytest.fixture -def fake_task(faker: Faker) -> Callable[..., Task]: - def _creator(**overrides) -> Task: - return Task( - **({"ID": faker.uuid4(), "Name": faker.pystr(), "Spec": {}} | overrides) - ) - - return _creator - - -async def test_try_assigning_task_to_instances_with_no_instances( - mocker: MockerFixture, - fake_task: Callable[..., Task], -): - fake_app = mocker.Mock() - pending_task = fake_task() - assert ( - await try_assigning_task_to_instances( - fake_app, pending_task, [], notify_progress=True - ) - is False - ) - - -async def test_try_assigning_task_to_instances( - mocker: MockerFixture, - fake_task: Callable[..., Task], - fake_ec2_instance_data: Callable[..., EC2InstanceData], -): - fake_app = mocker.Mock() - fake_app.state.settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_START_TIME = ( - timedelta(minutes=1) - ) - pending_task = fake_task( - Spec={"Resources": {"Reservations": {"NanoCPUs": 2 * 1e9}}} - ) - fake_instance = fake_ec2_instance_data() - pending_instance_to_tasks: list[AssignedTasksToInstance] = [ - AssignedTasksToInstance( - instance=fake_instance, - assigned_tasks=[], - available_resources=Resources(cpus=4, ram=ByteSize(1024**3)), - ) - ] - - # calling once should allow to add that task to the instance - assert ( - await try_assigning_task_to_instances( - fake_app, - pending_task, - pending_instance_to_tasks, - notify_progress=True, - ) - is True - ) - # calling a second time as well should allow to add that task to the instance - assert ( - await try_assigning_task_to_instances( - fake_app, - pending_task, - pending_instance_to_tasks, - notify_progress=True, - ) - is True - ) - # calling a third time should fail - assert ( - await try_assigning_task_to_instances( - fake_app, - pending_task, - pending_instance_to_tasks, - notify_progress=True, - ) - is False - ) diff --git a/services/autoscaling/tests/unit/test_utils_ec2.py b/services/autoscaling/tests/unit/test_utils_ec2.py index 8697c8d5f11..575e17958ad 100644 --- a/services/autoscaling/tests/unit/test_utils_ec2.py +++ b/services/autoscaling/tests/unit/test_utils_ec2.py @@ -34,8 +34,7 @@ def random_fake_available_instances(faker: Faker) -> list[EC2InstanceType]: list_of_instances = [ EC2InstanceType( name=faker.pystr(), - cpus=n, - ram=ByteSize(n), + resources=Resources(cpus=n, ram=ByteSize(n)), ) for n in range(1, 30) ] @@ -59,7 +58,9 @@ async def test_find_best_fitting_ec2_instance_closest_instance_policy_with_resou [ ( Resources(cpus=n, ram=ByteSize(n)), - EC2InstanceType(name="c5ad.12xlarge", cpus=n, ram=ByteSize(n)), + EC2InstanceType( + name="c5ad.12xlarge", resources=Resources(cpus=n, ram=ByteSize(n)) + ), ) for n in range(1, 30) ], @@ -76,10 +77,7 @@ async def test_find_best_fitting_ec2_instance_closest_instance_policy( score_type=closest_instance_policy, ) - SKIPPED_KEYS = ["name"] - for k in found_instance.__dict__: - if k not in SKIPPED_KEYS: - assert getattr(found_instance, k) == getattr(expected_ec2_instance, k) + assert found_instance.resources == expected_ec2_instance.resources def test_compose_user_data(faker: Faker): diff --git a/services/clusters-keeper/src/simcore_service_clusters_keeper/rpc/ec2_instances.py b/services/clusters-keeper/src/simcore_service_clusters_keeper/rpc/ec2_instances.py index a68760ad762..ab09c0a128e 100644 --- a/services/clusters-keeper/src/simcore_service_clusters_keeper/rpc/ec2_instances.py +++ b/services/clusters-keeper/src/simcore_service_clusters_keeper/rpc/ec2_instances.py @@ -1,5 +1,3 @@ -from dataclasses import asdict - from aws_library.ec2.models import EC2InstanceType from fastapi import FastAPI from models_library.api_schemas_clusters_keeper.ec2_instances import EC2InstanceTypeGet @@ -17,4 +15,7 @@ async def get_instance_type_details( instance_capabilities: list[EC2InstanceType] = await get_ec2_client( app ).get_ec2_instance_capabilities(instance_type_names) - return [EC2InstanceTypeGet(**asdict(t)) for t in instance_capabilities] + return [ + EC2InstanceTypeGet(name=t.name, cpus=t.resources.cpus, ram=t.resources.ram) + for t in instance_capabilities + ] diff --git a/services/clusters-keeper/src/simcore_service_clusters_keeper/utils/clusters.py b/services/clusters-keeper/src/simcore_service_clusters_keeper/utils/clusters.py index 341d7f9d749..e07c6e571bd 100644 --- a/services/clusters-keeper/src/simcore_service_clusters_keeper/utils/clusters.py +++ b/services/clusters-keeper/src/simcore_service_clusters_keeper/utils/clusters.py @@ -67,6 +67,8 @@ def _convert_to_env_dict(entries: dict[str, Any]) -> str: startup_commands = ec2_boot_specific.custom_boot_scripts.copy() startup_commands.extend( [ + # NOTE: https://stackoverflow.com/questions/41203492/solving-redis-warnings-on-overcommit-memory-and-transparent-huge-pages-for-ubunt + "sysctl vm.overcommit_memory=1", f"echo '{_docker_compose_yml_base64_encoded()}' | base64 -d > docker-compose.yml", "docker swarm init", f"{' '.join(environment_variables)} docker stack deploy --with-registry-auth --compose-file=docker-compose.yml dask_stack",