Skip to content

Commit

Permalink
🐛🎨Computational autoscaling: allow multi-machining/processing (ITISFo…
Browse files Browse the repository at this point in the history
  • Loading branch information
sanderegg authored Jan 17, 2024
1 parent 0cdb35b commit 1dc7040
Show file tree
Hide file tree
Showing 34 changed files with 890 additions and 1,159 deletions.
19 changes: 9 additions & 10 deletions packages/aws-library/src/aws_library/ec2/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,13 @@ async def get_ec2_instance_capabilities(
list_instances.append(
EC2InstanceType(
name=instance["InstanceType"],
cpus=instance["VCpuInfo"]["DefaultVCpus"],
ram=ByteSize(
int(instance["MemoryInfo"]["SizeInMiB"]) * 1024 * 1024
resources=Resources(
cpus=instance["VCpuInfo"]["DefaultVCpus"],
ram=ByteSize(
int(instance["MemoryInfo"]["SizeInMiB"])
* 1024
* 1024
),
),
)
)
Expand Down Expand Up @@ -173,9 +177,7 @@ async def start_aws_instance(
tags=parse_obj_as(
EC2Tags, {tag["Key"]: tag["Value"] for tag in instance["Tags"]}
),
resources=Resources(
cpus=instance_config.type.cpus, ram=instance_config.type.ram
),
resources=instance_config.type.resources,
)
for instance in instances["Reservations"][0]["Instances"]
]
Expand Down Expand Up @@ -234,10 +236,7 @@ async def get_instances(
else None,
type=instance["InstanceType"],
state=instance["State"]["Name"],
resources=Resources(
cpus=ec2_instance_types[0].cpus,
ram=ec2_instance_types[0].ram,
),
resources=ec2_instance_types[0].resources,
tags=parse_obj_as(
EC2Tags,
{tag["Key"]: tag["Value"] for tag in instance["Tags"]},
Expand Down
13 changes: 8 additions & 5 deletions packages/aws-library/src/aws_library/ec2/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,12 @@
Extra,
Field,
NonNegativeFloat,
PositiveInt,
validator,
)
from types_aiobotocore_ec2.literals import InstanceStateNameType, InstanceTypeType


class Resources(BaseModel):
class Resources(BaseModel, frozen=True):
cpus: NonNegativeFloat
ram: ByteSize

Expand Down Expand Up @@ -53,12 +52,16 @@ def __sub__(self, other: "Resources") -> "Resources":
}
)

@validator("cpus", pre=True)
@classmethod
def _floor_cpus_to_0(cls, v: float) -> float:
return max(v, 0)

@dataclass(frozen=True)

@dataclass(frozen=True, kw_only=True, slots=True)
class EC2InstanceType:
name: InstanceTypeType
cpus: PositiveInt
ram: ByteSize
resources: Resources


InstancePrivateDNSName: TypeAlias = str
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from dataclasses import dataclass

from pydantic import ByteSize, PositiveInt
from pydantic import ByteSize, NonNegativeFloat


@dataclass(frozen=True)
class EC2InstanceTypeGet:
name: str
cpus: PositiveInt
cpus: NonNegativeFloat
ram: ByteSize
4 changes: 2 additions & 2 deletions packages/pytest-simcore/src/pytest_simcore/dask_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ async def dask_spec_local_cluster(
scheduler_address = URL(cluster.scheduler_address)
monkeypatch.setenv(
"COMPUTATIONAL_BACKEND_DEFAULT_CLUSTER_URL",
f"{scheduler_address}" or "invalid",
f"{scheduler_address or 'invalid'}",
)
yield cluster

Expand All @@ -95,7 +95,7 @@ async def dask_local_cluster_without_workers(
scheduler_address = URL(cluster.scheduler_address)
monkeypatch.setenv(
"COMPUTATIONAL_BACKEND_DEFAULT_CLUSTER_URL",
f"{scheduler_address}" or "invalid",
f"{scheduler_address or 'invalid'}",
)
yield cluster

Expand Down
4 changes: 2 additions & 2 deletions services/autoscaling/docker/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ if [ "${SC_BUILD_TARGET}" = "development" ]; then
fi

if [ "${SC_BOOT_MODE}" = "debug-ptvsd" ]; then
# NOTE: production does NOT pre-installs ptvsd
pip install --no-cache-dir ptvsd
# NOTE: production does NOT pre-installs debugpy
pip install --no-cache-dir debugpy
fi

# Appends docker group if socket is mounted
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging

from fastapi import FastAPI
from models_library.basic_types import BootModeEnum
from servicelib.fastapi.prometheus_instrumentation import (
setup_prometheus_instrumentation,
)
Expand All @@ -21,6 +22,7 @@
from ..modules.ec2 import setup as setup_ec2
from ..modules.rabbitmq import setup as setup_rabbitmq
from ..modules.redis import setup as setup_redis
from ..modules.remote_debug import setup_remote_debugging
from .settings import ApplicationSettings

logger = logging.getLogger(__name__)
Expand All @@ -46,6 +48,8 @@ def create_app(settings: ApplicationSettings) -> FastAPI:
setup_prometheus_instrumentation(app)

# PLUGINS SETUP
if settings.SC_BOOT_MODE == BootModeEnum.DEBUG:
setup_remote_debugging(app)
setup_api_routes(app)
setup_docker(app)
setup_rabbitmq(app)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
BootModeEnum,
BuildTargetEnum,
LogLevel,
PortInt,
VersionTag,
)
from models_library.docker import DockerLabelKey
Expand Down Expand Up @@ -182,6 +183,7 @@ class ApplicationSettings(BaseCustomSettings, MixinLoggingSettings):
AUTOSCALING_DEBUG: bool = Field(
default=False, description="Debug mode", env=["AUTOSCALING_DEBUG", "DEBUG"]
)
AUTOSCALING_REMOTE_DEBUG_PORT: PortInt = PortInt(3000)

AUTOSCALING_LOGLEVEL: LogLevel = Field(
LogLevel.INFO, env=["AUTOSCALING_LOGLEVEL", "LOG_LEVEL", "LOGLEVEL"]
Expand Down
74 changes: 58 additions & 16 deletions services/autoscaling/src/simcore_service_autoscaling/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,43 +6,68 @@
from models_library.generated_models.docker_rest_api import Node


@dataclass(frozen=True, kw_only=True)
class AssignedTasksToInstance:
instance: EC2InstanceData
available_resources: Resources
assigned_tasks: list
@dataclass(frozen=True, slots=True, kw_only=True)
class _TaskAssignmentMixin:
assigned_tasks: list = field(default_factory=list)
available_resources: Resources = field(default_factory=Resources.create_as_empty)

def assign_task(self, task, task_resources: Resources) -> None:
self.assigned_tasks.append(task)
object.__setattr__(
self, "available_resources", self.available_resources - task_resources
)

@dataclass(frozen=True, kw_only=True)
class AssignedTasksToInstanceType:
def has_resources_for_task(self, task_resources: Resources) -> bool:
return bool(self.available_resources >= task_resources)


@dataclass(frozen=True, kw_only=True, slots=True)
class AssignedTasksToInstanceType(_TaskAssignmentMixin):
instance_type: EC2InstanceType
assigned_tasks: list


@dataclass(frozen=True)
class AssociatedInstance:
node: Node
@dataclass(frozen=True, kw_only=True, slots=True)
class _BaseInstance(_TaskAssignmentMixin):
ec2_instance: EC2InstanceData

def __post_init__(self) -> None:
if self.available_resources == Resources.create_as_empty():
object.__setattr__(self, "available_resources", self.ec2_instance.resources)


@dataclass(frozen=True, kw_only=True, slots=True)
class AssociatedInstance(_BaseInstance):
node: Node


@dataclass(frozen=True, kw_only=True, slots=True)
class NonAssociatedInstance(_BaseInstance):
...

@dataclass(frozen=True)

@dataclass(frozen=True, kw_only=True, slots=True)
class Cluster:
active_nodes: list[AssociatedInstance] = field(
metadata={
"description": "This is a EC2 backed docker node which is active (with running tasks)"
"description": "This is a EC2-backed docker node which is active and ready to receive tasks (or with running tasks)"
}
)
pending_nodes: list[AssociatedInstance] = field(
metadata={
"description": "This is a EC2-backed docker node which is active and NOT yet ready to receive tasks"
}
)
drained_nodes: list[AssociatedInstance] = field(
metadata={
"description": "This is a EC2 backed docker node which is drained (with no tasks)"
"description": "This is a EC2-backed docker node which is drained (cannot accept tasks)"
}
)
reserve_drained_nodes: list[AssociatedInstance] = field(
metadata={
"description": "This is a EC2 backed docker node which is drained in the reserve if this is enabled (with no tasks)"
"description": "This is a EC2-backed docker node which is drained in the reserve if this is enabled (with no tasks)"
}
)
pending_ec2s: list[EC2InstanceData] = field(
pending_ec2s: list[NonAssociatedInstance] = field(
metadata={
"description": "This is an EC2 instance that is not yet associated to a docker node"
}
Expand All @@ -54,6 +79,23 @@ class Cluster:
)
terminated_instances: list[EC2InstanceData]

def can_scale_down(self) -> bool:
return bool(
self.active_nodes
or self.pending_nodes
or self.drained_nodes
or self.pending_ec2s
)

def total_number_of_machines(self) -> int:
return (
len(self.active_nodes)
+ len(self.pending_nodes)
+ len(self.drained_nodes)
+ len(self.reserve_drained_nodes)
+ len(self.pending_ec2s)
)


DaskTaskId: TypeAlias = str

Expand Down
Loading

0 comments on commit 1dc7040

Please sign in to comment.