Skip to content

Commit

Permalink
[Console] Status for RFS Backfill (opensearch-project#771)
Browse files Browse the repository at this point in the history
* finish implementing status

Signed-off-by: Mikayla Thompson <thomika@amazon.com>

* Add tests

Signed-off-by: Mikayla Thompson <thomika@amazon.com>

* improve printing the status (a bit)

Signed-off-by: Mikayla Thompson <thomika@amazon.com>

---------

Signed-off-by: Mikayla Thompson <thomika@amazon.com>
  • Loading branch information
mikaylathompson authored Jun 26, 2024
1 parent 3fec905 commit d2118be
Show file tree
Hide file tree
Showing 14 changed files with 471 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -231,9 +231,11 @@ def scale_backfill_cmd(ctx, units: int):


@backfill_group.command(name="status")
@click.option('--deep-check', is_flag=True, help='Perform a deep status check of the backfill')
@click.pass_obj
def status_backfill_cmd(ctx):
exitcode, message = logic_backfill.status(ctx.env.backfill)
def status_backfill_cmd(ctx, deep_check):
logger.info(f"Called `console backfill status`, with {deep_check=}")
exitcode, message = logic_backfill.status(ctx.env.backfill, deep_check=deep_check)
if exitcode != ExitCode.SUCCESS:
raise click.ClickException(message)
click.echo(message)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,16 +119,18 @@ def scale(backfill: Backfill, units: int, *args, **kwargs) -> Tuple[ExitCode, st
return ExitCode.FAILURE, "Backfill scale failed." + "\n" + result.display()


def status(backfill: Backfill, *args, **kwargs) -> Tuple[ExitCode, str]:
logger.info("Getting backfill status")
def status(backfill: Backfill, deep_check, *args, **kwargs) -> Tuple[ExitCode, str]:
logger.info(f"Getting backfill status with {deep_check=}")
try:
status = backfill.get_status(*args, **kwargs)
status = backfill.get_status(deep_check, *args, **kwargs)
except NotImplementedError:
logger.error(f"Status is not implemented for backfill {type(backfill).__name__}")
return ExitCode.FAILURE, f"Status is not implemented for backfill: {type(backfill).__name__}"
except Exception as e:
logger.error(f"Failed to get status of backfill: {e}")
return ExitCode.FAILURE, f"Failure when getting status of backfill: {type(e).__name__} {e}"
if status:
return ExitCode.SUCCESS, status.value
return ExitCode.FAILURE, "Backfill status retrieval failed." + "\n" + status
if status.success:
return (ExitCode.SUCCESS,
f"{status.value[0]}\n{status.value[1]}" if not isinstance(status.value, str) else status.value)
return ExitCode.FAILURE, "Backfill status retrieval failed." + "\n" + \
f"{status.value[0]}\n{status.value[1]}" if not isinstance(status.value, str) else status.value
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
}


BackfillStatus = Enum("BackfillStatus", ["NOT_STARTED", "RUNNING", "STOPPED", "FAILED"])
BackfillStatus = Enum("BackfillStatus", ["NOT_STARTED", "STARTING", "RUNNING", "STOPPED", "FAILED"])


class Backfill(ABC):
Expand Down Expand Up @@ -50,7 +50,7 @@ def stop(self, *args, **kwargs) -> CommandResult:
pass

@abstractmethod
def get_status(self, *args, **kwargs) -> BackfillStatus:
def get_status(self, *args, **kwargs) -> CommandResult:
"""Return a status"""
pass

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from console_link.models.osi_utils import (create_pipeline_from_env, start_pipeline, stop_pipeline,
OpenSearchIngestionMigrationProps)
from console_link.models.cluster import Cluster
from console_link.models.backfill_base import Backfill, BackfillStatus
from console_link.models.backfill_base import Backfill
from console_link.models.command_result import CommandResult
from typing import Dict
from cerberus import Validator
Expand Down Expand Up @@ -92,7 +92,7 @@ def stop(self, pipeline_name=None):
pipeline_name = self.osi_props.pipeline_name
stop_pipeline(osi_client=self.osi_client, pipeline_name=pipeline_name)

def get_status(self, *args, **kwargs) -> BackfillStatus:
def get_status(self, *args, **kwargs) -> CommandResult:
raise NotImplementedError()

def scale(self, units: int, *args, **kwargs) -> CommandResult:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Dict
from typing import Dict, Optional
from console_link.models.backfill_base import Backfill, BackfillStatus
from console_link.models.cluster import Cluster
from console_link.models.schema_tools import contains_one_of
Expand Down Expand Up @@ -61,7 +61,7 @@ def start(self, *args, **kwargs) -> CommandResult:
def stop(self, *args, **kwargs) -> CommandResult:
raise NotImplementedError()

def get_status(self, *args, **kwargs) -> BackfillStatus:
def get_status(self, *args, **kwargs) -> CommandResult:
raise NotImplementedError()

def scale(self, units: int, *args, **kwargs) -> CommandResult:
Expand Down Expand Up @@ -96,3 +96,41 @@ def stop(self, *args, **kwargs) -> CommandResult:
def scale(self, units: int, *args, **kwargs) -> CommandResult:
logger.info(f"Scaling RFS backfill by setting desired count to {units} instances")
return self.ecs_client.set_desired_count(units)

def get_status(self, deep_check, *args, **kwargs) -> CommandResult:
logger.info(f"Getting status of RFS backfill, with {deep_check=}")
instance_statuses = self.ecs_client.get_instance_statuses()
if not instance_statuses:
return CommandResult(False, "Failed to get instance statuses")

status_string = str(instance_statuses)
if deep_check:
try:
shard_status = self._get_detailed_status()
except Exception as e:
logger.error(f"Failed to get detailed status: {e}")
if shard_status:
status_string += f"\n{shard_status}"

if instance_statuses.running > 0:
return CommandResult(True, (BackfillStatus.RUNNING, status_string))
elif instance_statuses.pending > 0:
return CommandResult(True, (BackfillStatus.STARTING, status_string))
return CommandResult(True, (BackfillStatus.STOPPED, status_string))

def _get_detailed_status(self) -> Optional[str]:
status_query = {"query": {
"bool": {
"must": [{"exists": {"field": "expiration"}}],
"must_not": [{"exists": {"field": "completedAt"}}]
}
}}
response = self.target_cluster.call_api("/.migrations_working_state/_search", json_body=status_query)
r_body = response.json()
logger.debug(f"Raw response: {r_body}")
if "hits" in r_body:
logger.info(f"Hits on detailed status query: {r_body['hits']}")
logger.info(f"Sample of remaining shards: {[hit['_id'] for hit in r_body['hits']['hits']]}")
return f"Remaining shards: {r_body['hits']['total']['value']}"
logger.warning("No hits on detailed status query, migration_working_state index may not exist or be populated")
return None
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ def __init__(self, config: Dict) -> None:
elif 'sigv4' in config:
self.auth_type = AuthMethod.SIGV4

def call_api(self, path, method: HttpMethod = HttpMethod.GET, timeout=None, **kwargs) -> requests.Response:
def call_api(self, path, method: HttpMethod = HttpMethod.GET, timeout=None,
json_body=None, **kwargs) -> requests.Response:
"""
Calls an API on the cluster.
"""
Expand All @@ -100,6 +101,11 @@ def call_api(self, path, method: HttpMethod = HttpMethod.GET, timeout=None, **kw
else:
raise NotImplementedError(f"Auth type {self.auth_type} not implemented")

if json_body is not None:
data = json_body
else:
data = None

logger.info(f"Making api call to {self.endpoint}{path}")

# Extract query parameters from kwargs
Expand All @@ -112,6 +118,7 @@ def call_api(self, path, method: HttpMethod = HttpMethod.GET, timeout=None, **kw
verify=(not self.allow_insecure),
auth=auth,
timeout=timeout,
json=data
)
logger.debug(f"Cluster API call request: {r.request}")
r.raise_for_status()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from typing import NamedTuple, Optional

import boto3

Expand All @@ -9,6 +10,15 @@
logger = logging.getLogger(__name__)


class InstanceStatuses(NamedTuple):
running: int = 0
pending: int = 0
desired: int = 0

def __str__(self):
return f"Running={self.running}\nPending={self.pending}\nDesired={self.desired}"


class ECSService:
def __init__(self, cluster_name, service_name, aws_region=None):
self.cluster_name = cluster_name
Expand All @@ -25,7 +35,6 @@ def set_desired_count(self, desired_count: int) -> CommandResult:
service=self.service_name,
desiredCount=desired_count
)
logger.debug(f"Response from update_service: {response}")

try:
raise_for_aws_api_error(response)
Expand All @@ -38,3 +47,22 @@ def set_desired_count(self, desired_count: int) -> CommandResult:
desired_count = response["service"]["desiredCount"]
return CommandResult(True, f"Service {self.service_name} set to {desired_count} desired count."
f" Currently {running_count} running and {pending_count} pending.")

def get_instance_statuses(self) -> Optional[InstanceStatuses]:
logger.info(f"Getting instance statuses for service {self.service_name}")
response = self.client.describe_services(
cluster=self.cluster_name,
services=[self.service_name]
)
try:
raise_for_aws_api_error(response)
except AWSAPIError as e:
logger.error(f"Error getting instance statuses: {e}")
return None

service = response["services"][0]
return InstanceStatuses(
running=service["runningCount"],
pending=service["pendingCount"],
desired=service["desiredCount"]
)
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,7 @@ def create(self, *args, **kwargs) -> CommandResult:
logger.error(f"Failed to create snapshot: {str(e)}")
return CommandResult(success=False, value=f"Failed to create snapshot: {str(e)}")

def status(self, *args, **kwargs) -> CommandResult:
deep_check = kwargs.get('deep_check', False)
def status(self, *args, deep_check=False, **kwargs) -> CommandResult:
if deep_check:
return get_snapshot_status_full(self.source_cluster, self.snapshot_name)
return get_snapshot_status(self.source_cluster, self.snapshot_name)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
{
"ResponseMetadata": {
"HTTPHeaders": {
"content-length": "18483",
"content-type": "application/x-amz-json-1.1",
"date": "Wed, 26 Jun 2024 05:19:27 GMT",
"x-amzn-requestid": "2840c6e1-4016-40b1-92c4-81537cc8fa44"
},
"HTTPStatusCode": 200,
"RequestId": "2840c6e1-4016-40b1-92c4-81537cc8fa44",
"RetryAttempts": 0
},
"failures": [],
"services": [
{
"clusterArn": "arn:aws:ecs:us-east-1:471142325910:cluster/migration-aws-integ-ecs-cluster",
"createdAt": 1719256381.625,
"createdBy": "arn:aws:iam::471142325910:role/cdk-hnb659fds-cfn-exec-role-471142325910-us-east-1",
"deploymentConfiguration": {
"alarms": {
"alarmNames": [],
"enable": false,
"rollback": false
},
"deploymentCircuitBreaker": {
"enable": false,
"rollback": false
},
"maximumPercent": 200,
"minimumHealthyPercent": 50
},
"deploymentController": {
"type": "ECS"
},
"deployments": [
{
"createdAt": 1719357851.906,
"desiredCount": 5,
"failedTasks": 0,
"id": "ecs-svc/6398738394350162857",
"launchType": "FARGATE",
"networkConfiguration": {
"awsvpcConfiguration": {
"assignPublicIp": "ENABLED",
"securityGroups": [
"sg-06238a66dd0dd5c3c",
"sg-020e147def49d9d63"
],
"subnets": [
"subnet-063807f9cb96e6512",
"subnet-005f438b980128cfc"
]
}
},
"pendingCount": 2,
"platformFamily": "Linux",
"platformVersion": "1.4.0",
"rolloutState": "COMPLETED",
"rolloutStateReason": "ECS deployment ecs-svc/6398738394350162857 completed.",
"runningCount": 1,
"status": "PRIMARY",
"taskDefinition": "arn:aws:ecs:us-east-1:471142325910:task-definition/migration-aws-integ-reindex-from-snapshot:7",
"updatedAt": 1719377448.437
}
],
"desiredCount": 5,
"enableECSManagedTags": false,
"enableExecuteCommand": true,
"events": [],
"launchType": "FARGATE",
"loadBalancers": [],
"networkConfiguration": {
"awsvpcConfiguration": {
"assignPublicIp": "ENABLED",
"securityGroups": ["sg-06238a66dd0dd5c3c", "sg-020e147def49d9d63"],
"subnets": ["subnet-063807f9cb96e6512", "subnet-005f438b980128cfc"]
}
},
"pendingCount": 2,
"placementConstraints": [],
"placementStrategy": [],
"platformFamily": "Linux",
"platformVersion": "LATEST",
"propagateTags": "NONE",
"roleArn": "arn:aws:iam::471142325910:role/aws-service-role/ecs.amazonaws.com/AWSServiceRoleForECS",
"runningCount": 1,
"schedulingStrategy": "REPLICA",
"serviceArn": "arn:aws:ecs:us-east-1:471142325910:service/migration-aws-integ-ecs-cluster/migration-aws-integ-reindex-from-snapshot",
"serviceName": "migration-aws-integ-reindex-from-snapshot",
"serviceRegistries": [],
"status": "ACTIVE",
"taskDefinition": "arn:aws:ecs:us-east-1:471142325910:task-definition/migration-aws-integ-reindex-from-snapshot:7"
}
]
}
Loading

0 comments on commit d2118be

Please sign in to comment.