diff --git a/ingestion_server/ingestion_server/api.py b/ingestion_server/ingestion_server/api.py index 240a095b400..dc004a8674a 100644 --- a/ingestion_server/ingestion_server/api.py +++ b/ingestion_server/ingestion_server/api.py @@ -1,4 +1,5 @@ """A small RPC API server for scheduling data refresh and indexing tasks.""" +from collections import defaultdict import logging import os @@ -15,6 +16,11 @@ from ingestion_server import slack from ingestion_server.constants.media_types import MEDIA_TYPES, MediaType +from ingestion_server.db_helpers import ( + DB_UPSTREAM_CONFIG, + DB_API_CONFIG, + database_connect, +) from ingestion_server.es_helpers import elasticsearch_connect, get_stat from ingestion_server.indexer import TableIndexer from ingestion_server.state import clear_state, worker_finished @@ -38,10 +44,45 @@ class HealthResource: @staticmethod - def on_get(_, resp): + def on_get(req, resp): + """ + Health check for the service. Optionally check on resources with the + check_deps=true parameter. + """ + # Set the initial response, but change it if necessary resp.status = falcon.HTTP_200 resp.media = {"status": "200 OK"} + if not req.get_param_as_bool("check_deps", blank_as_true=False): + return + + messages = defaultdict(list) + # Elasticsearch checks + es = elasticsearch_connect(timeout=3) + if not es: + messages["es"].append("Elasticsearch could not be reached") + else: + es_health = es.cluster.health(timeout="3s") + if es_health["timed_out"]: + messages["es"].append("Elasticsearch health check timed out") + if (es_status := es_health["status"]) != "green": + messages["es"].append(f"Elasticsearch cluster health: {es_status}") + + # Database checks + for name, dbconfig in zip( + ["upstream", "api"], + [DB_UPSTREAM_CONFIG, DB_API_CONFIG], + ): + db = database_connect(dbconfig=dbconfig, timeout=3, attempt_reconnect=False) + if not db: + messages["db"].append( + f"Database connection for '{name}' could not be established" + ) + + if messages: + resp.status = falcon.HTTP_503 + resp.media = {"status": "503 Service Unavailable", "dependencies": messages} + class StatResource: @staticmethod diff --git a/ingestion_server/ingestion_server/cleanup.py b/ingestion_server/ingestion_server/cleanup.py index 5bc2e72d76d..e7a5dfa3058 100644 --- a/ingestion_server/ingestion_server/cleanup.py +++ b/ingestion_server/ingestion_server/cleanup.py @@ -14,8 +14,8 @@ import tldextract from psycopg2.extras import DictCursor, Json -from ingestion_server.indexer import DB_BUFFER_SIZE, database_connect - +from ingestion_server.indexer import DB_BUFFER_SIZE +from ingestion_server.db_helpers import database_connect # Number of records to buffer in memory at once CLEANUP_BUFFER_SIZE = DB_BUFFER_SIZE diff --git a/ingestion_server/ingestion_server/db_helpers.py b/ingestion_server/ingestion_server/db_helpers.py new file mode 100644 index 00000000000..92090b648eb --- /dev/null +++ b/ingestion_server/ingestion_server/db_helpers.py @@ -0,0 +1,60 @@ +import logging as log +import time +from typing import NamedTuple + +import psycopg2 +from decouple import config + + +class DbConfig(NamedTuple): + host: str + port: int + user: str + password: str + dbname: str + + +DB_API_CONFIG = DbConfig( + host=config("DATABASE_HOST", default="localhost"), + port=config("DATABASE_PORT", default=5432, cast=int), + user=config("DATABASE_USER", default="deploy"), + password=config("DATABASE_PASSWORD", default="deploy"), + dbname=config("DATABASE_NAME", default="openledger"), +) + +DB_UPSTREAM_CONFIG = DbConfig( + host=config("UPSTREAM_DB_HOST", default="localhost"), + port=config("UPSTREAM_DB_PORT", default=5433, cast=int), + user=config("UPSTREAM_DB_USER", default="deploy"), + password=config("UPSTREAM_DB_PASSWORD", default="deploy"), + dbname=config("UPSTREAM_DB_NAME", default="openledger"), +) + + +def database_connect( + autocommit: bool = False, + dbconfig: DbConfig = DB_API_CONFIG, + timeout: int = 5, + attempt_reconnect: bool = True, +): + """ + Repeatedly try to connect to the downstream (API) database until successful + (unless otherwise specified). + + :return: A database connection object + """ + while True: + try: + conn = psycopg2.connect(**dbconfig._asdict(), connect_timeout=timeout) + if autocommit: + conn.set_session(autocommit=True) + except psycopg2.OperationalError as e: + if not attempt_reconnect: + return None + log.exception(e) + log.error("Reconnecting to database in 5 seconds. . .") + time.sleep(5) + continue + break + + return conn diff --git a/ingestion_server/ingestion_server/indexer.py b/ingestion_server/ingestion_server/indexer.py index 34bed6a1498..530cb7a0f3a 100644 --- a/ingestion_server/ingestion_server/indexer.py +++ b/ingestion_server/ingestion_server/indexer.py @@ -23,7 +23,6 @@ from typing import Any import elasticsearch -import psycopg2 import requests from decouple import config from elasticsearch import Elasticsearch, helpers @@ -32,6 +31,7 @@ from requests import RequestException from ingestion_server import slack +from ingestion_server.db_helpers import database_connect from ingestion_server.distributed_reindex_scheduler import schedule_distributed_index from ingestion_server.elasticsearch_models import media_type_to_elasticsearch_model from ingestion_server.es_helpers import get_stat @@ -40,12 +40,6 @@ from ingestion_server.utils.sensitive_terms import get_sensitive_terms -DATABASE_HOST = config("DATABASE_HOST", default="localhost") -DATABASE_PORT = config("DATABASE_PORT", default=5432, cast=int) -DATABASE_USER = config("DATABASE_USER", default="deploy") -DATABASE_PASSWORD = config("DATABASE_PASSWORD", default="deploy") -DATABASE_NAME = config("DATABASE_NAME", default="openledger") - # See https://www.elastic.co/guide/en/elasticsearch/reference/8.8/docs-reindex.html#docs-reindex-throttle ES_FILTERED_INDEX_THROTTLING_RATE = config( "ES_FILTERED_INDEX_THROTTLING_RATE", default=20_000, cast=int @@ -63,34 +57,6 @@ ) -def database_connect(autocommit=False): - """ - Repeatedly try to connect to the downstream (API) database until successful. - - :return: A database connection object - """ - while True: - try: - conn = psycopg2.connect( - dbname=DATABASE_NAME, - user=DATABASE_USER, - password=DATABASE_PASSWORD, - host=DATABASE_HOST, - port=DATABASE_PORT, - connect_timeout=5, - ) - if autocommit: - conn.set_session(autocommit=True) - except psycopg2.OperationalError as e: - log.exception(e) - log.error("Reconnecting to database in 5 seconds. . .") - time.sleep(5) - continue - break - - return conn - - def get_last_item_ids(table): """ Find the last item added to Postgres and return both its sequential ID and UUID. diff --git a/ingestion_server/ingestion_server/ingest.py b/ingestion_server/ingestion_server/ingest.py index ceeb2bdbf25..f400f10ffd0 100644 --- a/ingestion_server/ingestion_server/ingest.py +++ b/ingestion_server/ingestion_server/ingest.py @@ -26,7 +26,7 @@ from ingestion_server import slack from ingestion_server.cleanup import clean_image_data from ingestion_server.constants.internal_types import ApproachType -from ingestion_server.indexer import database_connect +from ingestion_server.db_helpers import database_connect, DB_UPSTREAM_CONFIG from ingestion_server.queries import ( get_copy_data_query, get_create_ext_query, @@ -36,21 +36,15 @@ from ingestion_server.utils.config import get_record_limit -UPSTREAM_DB_HOST = config("UPSTREAM_DB_HOST", default="localhost") -UPSTREAM_DB_PORT = config("UPSTREAM_DB_PORT", default=5433, cast=int) -UPSTREAM_DB_USER = config("UPSTREAM_DB_USER", default="deploy") -UPSTREAM_DB_PASSWORD = config("UPSTREAM_DB_PASSWORD", default="deploy") -UPSTREAM_DB_NAME = config("UPSTREAM_DB_NAME", default="openledger") - RELATIVE_UPSTREAM_DB_HOST = config( "RELATIVE_UPSTREAM_DB_HOST", - default=UPSTREAM_DB_HOST, + default=DB_UPSTREAM_CONFIG.host, ) #: the hostname of the upstream DB from the POV of the downstream DB RELATIVE_UPSTREAM_DB_PORT = config( "RELATIVE_UPSTREAM_DB_PORT", - default=UPSTREAM_DB_PORT, + default=DB_UPSTREAM_CONFIG.port, cast=int, ) #: the port of the upstream DB from the POV of the downstream DB @@ -282,14 +276,7 @@ def refresh_api_table( "Starting ingestion server data refresh | _Next: copying data from upstream_", ) downstream_db = database_connect() - upstream_db = psycopg2.connect( - dbname=UPSTREAM_DB_NAME, - user=UPSTREAM_DB_USER, - port=UPSTREAM_DB_PORT, - password=UPSTREAM_DB_PASSWORD, - host=UPSTREAM_DB_HOST, - connect_timeout=5, - ) + upstream_db = database_connect(dbconfig=DB_UPSTREAM_CONFIG) shared_cols = _get_shared_cols( downstream_db, upstream_db, upstream_table, downstream_table ) @@ -309,9 +296,9 @@ def refresh_api_table( init_fdw = get_fdw_query( RELATIVE_UPSTREAM_DB_HOST, RELATIVE_UPSTREAM_DB_PORT, - UPSTREAM_DB_NAME, - UPSTREAM_DB_USER, - UPSTREAM_DB_PASSWORD, + DB_UPSTREAM_CONFIG.dbname, + DB_UPSTREAM_CONFIG.user, + DB_UPSTREAM_CONFIG.password, upstream_table, ) downstream_cur.execute(init_fdw)