Skip to content

Commit

Permalink
[#411] Implement deep health check
Browse files Browse the repository at this point in the history
  • Loading branch information
wujuu authored and Michal-Kolomanski committed Jan 10, 2023
1 parent 6396879 commit 84607a6
Show file tree
Hide file tree
Showing 9 changed files with 205 additions and 20 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased] - YYYY-MM-DD
### Added
- /health endpoint [@wujuu]
### Changed
- **BREAKING CHANGE**: Adjust user actions handling to the new user action schema [@JanKapala]
### Fixed
Expand Down
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,12 @@ pipenv run pytest ./tests
```bash
docker-compose -f docker-compose.testing.yml up && docker-compose -f docker-compose.testing.yml down
```
### Deep health check
You can curl `/health` server endpoint to check the application's health. It checks for
- Database connection
- User and Service tables row count (has to be at least 10 each)
- Celery workers connection
- JMS connection

### Recommendation Engines
Recommendation engines ensure that both logged in and non-logged in users receive relevant recommendations.
Expand Down
2 changes: 2 additions & 0 deletions recommender/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from recommender.api.endpoints.recommendations import api as recommendations_ns
from recommender.api.endpoints.user_actions import api as user_actions_ns
from recommender.api.endpoints.update import api as update_ns
from recommender.api.endpoints.health_check import api as health_check_ns


api = Api(
Expand All @@ -22,3 +23,4 @@
api.add_namespace(recommendations_ns)
api.add_namespace(user_actions_ns)
api.add_namespace(update_ns)
api.add_namespace(health_check_ns)
16 changes: 16 additions & 0 deletions recommender/api/endpoints/health_check.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
"""Health check endpoint definition"""

from __future__ import annotations
from flask_restx import Resource, Namespace
from recommender.services.health_check import deep_health_check

api = Namespace("health", "Endpoint used for checking the application's health")


@api.route("")
class HealthCheck(Resource):
"""Groups methods for checking the application health"""

def get(self):
"""Perform a deep health_check"""
return deep_health_check()
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class RandomInferenceComponent(BaseInferenceComponent):
- all users if necessary (for example during ML training).
"""

engine_name = "random"
engine_name = "Random"
default_explanation = Explanation(
long="This service has been selected at random however taking into"
" account the search criteria",
Expand Down
8 changes: 5 additions & 3 deletions recommender/services/engine_selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,11 @@ def get_default_recommendation_alg(
rec_alg: default recommendation algorithm
"""
rec_alg = os.environ.get(default_engine, "RL")
rec_alg = rec_alg.upper() if len(rec_alg) in {2, 3} else rec_alg.lower()

return rec_alg if rec_alg in engine_names else "RL"
try:
index = [e.lower() for e in list(engine_names)].index(rec_alg.lower())
return list(engine_names)[index]
except ValueError:
return "RL"


def get_engine_names(
Expand Down
152 changes: 152 additions & 0 deletions recommender/services/health_check.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
# pylint: disable=broad-except, too-few-public-methods, protected-access, fixme
"""Module grouping methods for checking the application health"""

from __future__ import annotations

from typing import List, Union, Dict, Optional

import celery
import kombu
import pymongo
import stomp
from flask import current_app
from mongoengine import Document

from recommender.engines.base.base_inference_component import BaseInferenceComponent

from recommender.engines.engines import ENGINES
from recommender.errors import NoSavedMLComponentError
from recommender.models import Service, User
from recommender.tasks import ping

# TODO: adjust if needed
HEALTHY_USERS_THRESHOLD = 10
# TODO: adjust if needed
HEALTHY_SERVICES_THRESHOLD = 10
K = 3


class HealthStatus:
"""
Class representing a health status of a component.
Components form a tree-like structure.
Can be a leaf (specifies status explicitly) or
a node (status is healthy if all the children are healthy)
"""

name: str
status: bool
error: Optional[str]
components: List[HealthStatus]

def __init__(
self,
name: str,
status_or_components: Union[bool, List[HealthStatus]],
error: Optional[str] = None,
):
self.name = name
self.error = error

if isinstance(status_or_components, bool):
self.status = status_or_components
self.components = []
else:
self.status = all(component.status for component in status_or_components)
self.components = status_or_components

def to_dict(self) -> Dict:
"""Report the health status as dict."""
as_dict = {"status": "UP" if self.status else "DOWN"}
as_dict |= {child.name: child.to_dict() for child in self.components}
as_dict |= {"error": self.error} if self.error else {}

return as_dict


def deep_health_check() -> Dict:
"""
Perform a deep health check of the recommender application. Checks for:
- database status (users and services tables have enough members)
- celery workers (celery worker is running and responds to scheduled tasks
- databus connection (stomp client has a connection to the databus endpoint)
- recommender engines (all components can be initialized and loaded)
"""
return HealthStatus(
"service",
[
HealthStatus(
"database",
[
_check_tables_health(User, HEALTHY_USERS_THRESHOLD),
_check_tables_health(Service, HEALTHY_SERVICES_THRESHOLD),
],
),
_check_celery_workers(),
_check_databus_connection(),
HealthStatus(
"recommender_engines",
[
_check_engines_health(engine_name, engine_type)
for engine_name, engine_type in ENGINES.items()
],
),
],
).to_dict()


def _check_celery_workers() -> HealthStatus:
name = "celery_worker"
try:
ping.apply_async(retry=False, time_limit=3).get(timeout=3)
except celery.exceptions.TimeoutError:
return HealthStatus(name, False, "Couldn't reach a celery worker")
except kombu.exceptions.OperationalError:
return HealthStatus(name, False, "Redis backend is not operational")

return HealthStatus(name, True)


def _check_tables_health(
model: Document.__class__, rows_threshold: int
) -> HealthStatus:
name = f"{model.__name__}_table".lower()
try:
count = model.objects.count()
except pymongo.errors.ServerSelectionTimeoutError:
return HealthStatus(name, False, "Error during fetching items")

if count < rows_threshold:
return HealthStatus(name, False, "Too few items")

return HealthStatus(name, True)


def _check_engines_health(
engine_name: str, engine_type: BaseInferenceComponent.__class__
) -> HealthStatus:
try:
engine_type(K=K)
except NoSavedMLComponentError:
# If we do not have any engine models in the DB,
# then the engine is unhealthy
return HealthStatus(engine_name, False, "Missing model in db")
return HealthStatus(engine_name, True)


def _check_databus_connection() -> HealthStatus:
name = "databus"
try:
host = current_app.config["RS_DATABUS_HOST"]
port = current_app.config["RS_DATABUS_PORT"]
username = current_app.config["RS_DATABUS_USERNAME"]
password = current_app.config["RS_DATABUS_PASSWORD"]
enable_ssl = current_app.config["RS_DATABUS_SSL"]
connection = stomp.Connection([(host, port)])
connection.connect(
username=username, password=password, wait=True, ssl=enable_ssl
)
connection.disconnect()
except stomp.exception.ConnectFailedException:
return HealthStatus(name, False, "Connection to databus failed")
return HealthStatus(name, True)
6 changes: 6 additions & 0 deletions recommender/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,9 @@ def send_recommendation_to_databus(context: dict, recommendation_response: dict)
)

conn.disconnect()


@celery.task
def ping() -> bool:
"""This task checks the connection to the celery workers"""
return True
32 changes: 16 additions & 16 deletions tests/services/test_engine_selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ def test_get_default_recommendation_alg(get_engines):
assert get_default_recommendation_alg(get_engines.keys()) == "RL"
os.environ["DEFAULT_RECOMMENDATION_ALG"] = "NCF"
assert get_default_recommendation_alg(get_engines.keys()) == "NCF"
os.environ["DEFAULT_RECOMMENDATION_ALG"] = "random"
assert get_default_recommendation_alg(get_engines.keys()) == "random"
os.environ["DEFAULT_RECOMMENDATION_ALG"] = "Random"
assert get_default_recommendation_alg(get_engines.keys()) == "Random"

os.environ["DEFAULT_RECOMMENDATION_ALG"] = "NFC"
assert get_default_recommendation_alg(get_engines.keys()) == "RL"
Expand All @@ -81,8 +81,8 @@ def test_get_default_recommendation_alg(get_engines):
assert get_default_recommendation_alg(get_engines.keys()) == "RL"
os.environ["DEFAULT_RECOMMENDATION_ALG"] = "ncf"
assert get_default_recommendation_alg(get_engines.keys()) == "NCF"
os.environ["DEFAULT_RECOMMENDATION_ALG"] = "Random"
assert get_default_recommendation_alg(get_engines.keys()) == "random"
os.environ["DEFAULT_RECOMMENDATION_ALG"] = "random"
assert get_default_recommendation_alg(get_engines.keys()) == "Random"


def test_get_engine_names(get_engines):
Expand All @@ -94,8 +94,8 @@ def test_get_engine_names(get_engines):
3) Any engine name that exists.
"""

engine_from_req = ["RL", "NCF", "random"]
default_engine = ["RL", "NCF", "random"]
engine_from_req = ["RL", "NCF", "Random"]
default_engine = ["RL", "NCF", "Random"]
engines_keys = list(get_engines.keys())

# 1. case - engine_from_req is specified, and engines_keys are the same as the range of engine_from_req
Expand Down Expand Up @@ -128,8 +128,8 @@ def test_get_engine_names(get_engines):
assert engine_names == expected_eg_names

# 4. case - engines_keys includes engines than engine_from_req nad default_engine
engine_from_req = ("RL", "NCF", "random")
default_engine = ("RL", "NCF", "random")
engine_from_req = ("RL", "NCF", "Random")
default_engine = ("RL", "NCF", "Random")
# Last index of engines_keys, so it is expected to be the last element of returned names
engines_keys.append("New Engine")

Expand Down Expand Up @@ -170,10 +170,10 @@ def test_engine_loader(
assert type(engine) == NCFInferenceComponent
assert engine_name == "NCF"

engine_names = ["random", "NCF", "RL"]
engine_names = ["Random", "NCF", "RL"]
engine, engine_name = engine_loader(engine_names, engines, K)
assert type(engine) == RandomInferenceComponent
assert engine_name == "random"
assert engine_name == "Random"

# 2. case no ML engine is saved and random engine is not passed
engine_names = ["NCF", "RL"]
Expand All @@ -186,10 +186,10 @@ def test_engine_loader(
engine_loader(engine_names, engines, K)

# 3. case no ML engine is but random engine is also passed
engine_names = ["NCF", "RL", "random"]
engine_names = ["NCF", "RL", "Random"]
engine, engine_name = engine_loader(engine_names, engines, K)
assert type(engine) == RandomInferenceComponent
assert engine_name == "random"
assert engine_name == "Random"


def test_engine_loader2(
Expand Down Expand Up @@ -245,11 +245,11 @@ def test_load_engine(
# 2. user is random
del recommendation_data["user_id"]

for engine_version in {"RL", "NCF", "random", "placeholder"}:
for engine_version in {"RL", "NCF", "Random", "random", "placeholder"}:
recommendation_data["engine_version"] = engine_version
engine, engine_name = load_engine(recommendation_data)
assert type(engine) == RandomInferenceComponent
assert engine_name == "random"
assert engine_name == "Random"


def test_load_engine_with_aai_uid(
Expand Down Expand Up @@ -277,8 +277,8 @@ def test_load_engine_with_aai_uid(
# 2. user is random
del recommendation_data_with_aai_uid["aai_uid"]

for engine_version in {"RL", "NCF", "random", "placeholder"}:
for engine_version in {"RL", "NCF", "random", "Random", "placeholder"}:
recommendation_data_with_aai_uid["engine_version"] = engine_version
engine, engine_name = load_engine(recommendation_data_with_aai_uid)
assert type(engine) == RandomInferenceComponent
assert engine_name == "random"
assert engine_name == "Random"

0 comments on commit 84607a6

Please sign in to comment.