From aebccdb4e7ffb5923928f4fe33227afe5447c180 Mon Sep 17 00:00:00 2001 From: Madison Swain-Bowden Date: Tue, 26 Dec 2023 14:55:18 -0800 Subject: [PATCH 1/5] Move database connection into db_helpers --- ingestion_server/ingestion_server/cleanup.py | 4 +- .../ingestion_server/db_helpers.py | 46 +++++++++++++++++++ ingestion_server/ingestion_server/indexer.py | 36 +-------------- ingestion_server/ingestion_server/ingest.py | 2 +- 4 files changed, 50 insertions(+), 38 deletions(-) create mode 100644 ingestion_server/ingestion_server/db_helpers.py diff --git a/ingestion_server/ingestion_server/cleanup.py b/ingestion_server/ingestion_server/cleanup.py index 5bc2e72d76d..e7a5dfa3058 100644 --- a/ingestion_server/ingestion_server/cleanup.py +++ b/ingestion_server/ingestion_server/cleanup.py @@ -14,8 +14,8 @@ import tldextract from psycopg2.extras import DictCursor, Json -from ingestion_server.indexer import DB_BUFFER_SIZE, database_connect - +from ingestion_server.indexer import DB_BUFFER_SIZE +from ingestion_server.db_helpers import database_connect # Number of records to buffer in memory at once CLEANUP_BUFFER_SIZE = DB_BUFFER_SIZE diff --git a/ingestion_server/ingestion_server/db_helpers.py b/ingestion_server/ingestion_server/db_helpers.py new file mode 100644 index 00000000000..2bb8664db83 --- /dev/null +++ b/ingestion_server/ingestion_server/db_helpers.py @@ -0,0 +1,46 @@ +import logging as log +import time + +import psycopg2 +from decouple import config + +DATABASE_HOST = config("DATABASE_HOST", default="localhost") +DATABASE_PORT = config("DATABASE_PORT", default=5432, cast=int) +DATABASE_USER = config("DATABASE_USER", default="deploy") +DATABASE_PASSWORD = config("DATABASE_PASSWORD", default="deploy") +DATABASE_NAME = config("DATABASE_NAME", default="openledger") + + +def database_connect( + autocommit: bool = False, + timeout: int = 5, + attempt_reconnect: bool = True, +): + """ + Repeatedly try to connect to the downstream (API) database until successful + (unless otherwise specified). + + :return: A database connection object + """ + while True: + try: + conn = psycopg2.connect( + dbname=DATABASE_NAME, + user=DATABASE_USER, + password=DATABASE_PASSWORD, + host=DATABASE_HOST, + port=DATABASE_PORT, + connect_timeout=timeout, + ) + if autocommit: + conn.set_session(autocommit=True) + except psycopg2.OperationalError as e: + if not attempt_reconnect: + raise e + log.exception(e) + log.error("Reconnecting to database in 5 seconds. . .") + time.sleep(5) + continue + break + + return conn diff --git a/ingestion_server/ingestion_server/indexer.py b/ingestion_server/ingestion_server/indexer.py index 34bed6a1498..530cb7a0f3a 100644 --- a/ingestion_server/ingestion_server/indexer.py +++ b/ingestion_server/ingestion_server/indexer.py @@ -23,7 +23,6 @@ from typing import Any import elasticsearch -import psycopg2 import requests from decouple import config from elasticsearch import Elasticsearch, helpers @@ -32,6 +31,7 @@ from requests import RequestException from ingestion_server import slack +from ingestion_server.db_helpers import database_connect from ingestion_server.distributed_reindex_scheduler import schedule_distributed_index from ingestion_server.elasticsearch_models import media_type_to_elasticsearch_model from ingestion_server.es_helpers import get_stat @@ -40,12 +40,6 @@ from ingestion_server.utils.sensitive_terms import get_sensitive_terms -DATABASE_HOST = config("DATABASE_HOST", default="localhost") -DATABASE_PORT = config("DATABASE_PORT", default=5432, cast=int) -DATABASE_USER = config("DATABASE_USER", default="deploy") -DATABASE_PASSWORD = config("DATABASE_PASSWORD", default="deploy") -DATABASE_NAME = config("DATABASE_NAME", default="openledger") - # See https://www.elastic.co/guide/en/elasticsearch/reference/8.8/docs-reindex.html#docs-reindex-throttle ES_FILTERED_INDEX_THROTTLING_RATE = config( "ES_FILTERED_INDEX_THROTTLING_RATE", default=20_000, cast=int @@ -63,34 +57,6 @@ ) -def database_connect(autocommit=False): - """ - Repeatedly try to connect to the downstream (API) database until successful. - - :return: A database connection object - """ - while True: - try: - conn = psycopg2.connect( - dbname=DATABASE_NAME, - user=DATABASE_USER, - password=DATABASE_PASSWORD, - host=DATABASE_HOST, - port=DATABASE_PORT, - connect_timeout=5, - ) - if autocommit: - conn.set_session(autocommit=True) - except psycopg2.OperationalError as e: - log.exception(e) - log.error("Reconnecting to database in 5 seconds. . .") - time.sleep(5) - continue - break - - return conn - - def get_last_item_ids(table): """ Find the last item added to Postgres and return both its sequential ID and UUID. diff --git a/ingestion_server/ingestion_server/ingest.py b/ingestion_server/ingestion_server/ingest.py index ceeb2bdbf25..0f786668895 100644 --- a/ingestion_server/ingestion_server/ingest.py +++ b/ingestion_server/ingestion_server/ingest.py @@ -26,7 +26,7 @@ from ingestion_server import slack from ingestion_server.cleanup import clean_image_data from ingestion_server.constants.internal_types import ApproachType -from ingestion_server.indexer import database_connect +from ingestion_server.db_helpers import database_connect from ingestion_server.queries import ( get_copy_data_query, get_create_ext_query, From bbb611a43aaf88247dddda3eb0bbf478df13721a Mon Sep 17 00:00:00 2001 From: Madison Swain-Bowden Date: Tue, 26 Dec 2023 15:11:58 -0800 Subject: [PATCH 2/5] Simplify database config and references --- .../ingestion_server/db_helpers.py | 42 ++++++++++++------- ingestion_server/ingestion_server/ingest.py | 27 ++++-------- 2 files changed, 35 insertions(+), 34 deletions(-) diff --git a/ingestion_server/ingestion_server/db_helpers.py b/ingestion_server/ingestion_server/db_helpers.py index 2bb8664db83..92090b648eb 100644 --- a/ingestion_server/ingestion_server/db_helpers.py +++ b/ingestion_server/ingestion_server/db_helpers.py @@ -1,18 +1,39 @@ import logging as log import time +from typing import NamedTuple import psycopg2 from decouple import config -DATABASE_HOST = config("DATABASE_HOST", default="localhost") -DATABASE_PORT = config("DATABASE_PORT", default=5432, cast=int) -DATABASE_USER = config("DATABASE_USER", default="deploy") -DATABASE_PASSWORD = config("DATABASE_PASSWORD", default="deploy") -DATABASE_NAME = config("DATABASE_NAME", default="openledger") + +class DbConfig(NamedTuple): + host: str + port: int + user: str + password: str + dbname: str + + +DB_API_CONFIG = DbConfig( + host=config("DATABASE_HOST", default="localhost"), + port=config("DATABASE_PORT", default=5432, cast=int), + user=config("DATABASE_USER", default="deploy"), + password=config("DATABASE_PASSWORD", default="deploy"), + dbname=config("DATABASE_NAME", default="openledger"), +) + +DB_UPSTREAM_CONFIG = DbConfig( + host=config("UPSTREAM_DB_HOST", default="localhost"), + port=config("UPSTREAM_DB_PORT", default=5433, cast=int), + user=config("UPSTREAM_DB_USER", default="deploy"), + password=config("UPSTREAM_DB_PASSWORD", default="deploy"), + dbname=config("UPSTREAM_DB_NAME", default="openledger"), +) def database_connect( autocommit: bool = False, + dbconfig: DbConfig = DB_API_CONFIG, timeout: int = 5, attempt_reconnect: bool = True, ): @@ -24,19 +45,12 @@ def database_connect( """ while True: try: - conn = psycopg2.connect( - dbname=DATABASE_NAME, - user=DATABASE_USER, - password=DATABASE_PASSWORD, - host=DATABASE_HOST, - port=DATABASE_PORT, - connect_timeout=timeout, - ) + conn = psycopg2.connect(**dbconfig._asdict(), connect_timeout=timeout) if autocommit: conn.set_session(autocommit=True) except psycopg2.OperationalError as e: if not attempt_reconnect: - raise e + return None log.exception(e) log.error("Reconnecting to database in 5 seconds. . .") time.sleep(5) diff --git a/ingestion_server/ingestion_server/ingest.py b/ingestion_server/ingestion_server/ingest.py index 0f786668895..f400f10ffd0 100644 --- a/ingestion_server/ingestion_server/ingest.py +++ b/ingestion_server/ingestion_server/ingest.py @@ -26,7 +26,7 @@ from ingestion_server import slack from ingestion_server.cleanup import clean_image_data from ingestion_server.constants.internal_types import ApproachType -from ingestion_server.db_helpers import database_connect +from ingestion_server.db_helpers import database_connect, DB_UPSTREAM_CONFIG from ingestion_server.queries import ( get_copy_data_query, get_create_ext_query, @@ -36,21 +36,15 @@ from ingestion_server.utils.config import get_record_limit -UPSTREAM_DB_HOST = config("UPSTREAM_DB_HOST", default="localhost") -UPSTREAM_DB_PORT = config("UPSTREAM_DB_PORT", default=5433, cast=int) -UPSTREAM_DB_USER = config("UPSTREAM_DB_USER", default="deploy") -UPSTREAM_DB_PASSWORD = config("UPSTREAM_DB_PASSWORD", default="deploy") -UPSTREAM_DB_NAME = config("UPSTREAM_DB_NAME", default="openledger") - RELATIVE_UPSTREAM_DB_HOST = config( "RELATIVE_UPSTREAM_DB_HOST", - default=UPSTREAM_DB_HOST, + default=DB_UPSTREAM_CONFIG.host, ) #: the hostname of the upstream DB from the POV of the downstream DB RELATIVE_UPSTREAM_DB_PORT = config( "RELATIVE_UPSTREAM_DB_PORT", - default=UPSTREAM_DB_PORT, + default=DB_UPSTREAM_CONFIG.port, cast=int, ) #: the port of the upstream DB from the POV of the downstream DB @@ -282,14 +276,7 @@ def refresh_api_table( "Starting ingestion server data refresh | _Next: copying data from upstream_", ) downstream_db = database_connect() - upstream_db = psycopg2.connect( - dbname=UPSTREAM_DB_NAME, - user=UPSTREAM_DB_USER, - port=UPSTREAM_DB_PORT, - password=UPSTREAM_DB_PASSWORD, - host=UPSTREAM_DB_HOST, - connect_timeout=5, - ) + upstream_db = database_connect(dbconfig=DB_UPSTREAM_CONFIG) shared_cols = _get_shared_cols( downstream_db, upstream_db, upstream_table, downstream_table ) @@ -309,9 +296,9 @@ def refresh_api_table( init_fdw = get_fdw_query( RELATIVE_UPSTREAM_DB_HOST, RELATIVE_UPSTREAM_DB_PORT, - UPSTREAM_DB_NAME, - UPSTREAM_DB_USER, - UPSTREAM_DB_PASSWORD, + DB_UPSTREAM_CONFIG.dbname, + DB_UPSTREAM_CONFIG.user, + DB_UPSTREAM_CONFIG.password, upstream_table, ) downstream_cur.execute(init_fdw) From 2dd0cd13bb1ad43c56889dab817bcc5d3e4be197 Mon Sep 17 00:00:00 2001 From: Madison Swain-Bowden Date: Tue, 26 Dec 2023 15:23:08 -0800 Subject: [PATCH 3/5] Add more thorough health check for ingestion server --- ingestion_server/ingestion_server/api.py | 38 +++++++++++++++++++++++- 1 file changed, 37 insertions(+), 1 deletion(-) diff --git a/ingestion_server/ingestion_server/api.py b/ingestion_server/ingestion_server/api.py index 240a095b400..7efb60dcea0 100644 --- a/ingestion_server/ingestion_server/api.py +++ b/ingestion_server/ingestion_server/api.py @@ -15,6 +15,11 @@ from ingestion_server import slack from ingestion_server.constants.media_types import MEDIA_TYPES, MediaType +from ingestion_server.db_helpers import ( + DB_UPSTREAM_CONFIG, + DB_API_CONFIG, + database_connect, +) from ingestion_server.es_helpers import elasticsearch_connect, get_stat from ingestion_server.indexer import TableIndexer from ingestion_server.state import clear_state, worker_finished @@ -38,10 +43,41 @@ class HealthResource: @staticmethod - def on_get(_, resp): + def on_get(req, resp): + # Set the initial response, but change it if necessary resp.status = falcon.HTTP_200 resp.media = {"status": "200 OK"} + if not req.get_param_as_bool("check_deps", blank_as_true=False): + return + + messages = [] + # Elasticsearch checks + es = elasticsearch_connect(timeout=3) + if not es: + messages.append("Elasticsearch could not be reached") + else: + es_health = es.cluster.health(timeout="3s") + if es_health["timed_out"]: + messages.append("Elasticsearch health check timed out") + if (es_status := es_health["status"]) != "green": + messages.append(f"Elasticsearch cluster health: {es_status}") + + # Database checks + for name, dbconfig in zip( + ["upstream", "api"], + [DB_UPSTREAM_CONFIG, DB_API_CONFIG], + ): + db = database_connect(dbconfig=dbconfig, timeout=3, attempt_reconnect=False) + if not db: + messages.append( + f"Database connection for '{name}' could not be established" + ) + + if messages: + resp.status = falcon.HTTP_503 + resp.media = {"status": messages} + class StatResource: @staticmethod From 14d8adcbbc775f8181af148fae7471130475edc9 Mon Sep 17 00:00:00 2001 From: Madison Swain-Bowden Date: Tue, 26 Dec 2023 15:24:51 -0800 Subject: [PATCH 4/5] Docs --- ingestion_server/ingestion_server/api.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/ingestion_server/ingestion_server/api.py b/ingestion_server/ingestion_server/api.py index 7efb60dcea0..9614fcbf0b6 100644 --- a/ingestion_server/ingestion_server/api.py +++ b/ingestion_server/ingestion_server/api.py @@ -44,6 +44,10 @@ class HealthResource: @staticmethod def on_get(req, resp): + """ + Health check for the service. Optionally check on resources with the + check_deps=true parameter. + """ # Set the initial response, but change it if necessary resp.status = falcon.HTTP_200 resp.media = {"status": "200 OK"} From 72263b33d8b6567cd0551133a338c00b17ecb6d9 Mon Sep 17 00:00:00 2001 From: Madison Swain-Bowden Date: Tue, 16 Jan 2024 16:49:14 -0800 Subject: [PATCH 5/5] Update shape and content of response --- ingestion_server/ingestion_server/api.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/ingestion_server/ingestion_server/api.py b/ingestion_server/ingestion_server/api.py index 9614fcbf0b6..dc004a8674a 100644 --- a/ingestion_server/ingestion_server/api.py +++ b/ingestion_server/ingestion_server/api.py @@ -1,4 +1,5 @@ """A small RPC API server for scheduling data refresh and indexing tasks.""" +from collections import defaultdict import logging import os @@ -55,17 +56,17 @@ def on_get(req, resp): if not req.get_param_as_bool("check_deps", blank_as_true=False): return - messages = [] + messages = defaultdict(list) # Elasticsearch checks es = elasticsearch_connect(timeout=3) if not es: - messages.append("Elasticsearch could not be reached") + messages["es"].append("Elasticsearch could not be reached") else: es_health = es.cluster.health(timeout="3s") if es_health["timed_out"]: - messages.append("Elasticsearch health check timed out") + messages["es"].append("Elasticsearch health check timed out") if (es_status := es_health["status"]) != "green": - messages.append(f"Elasticsearch cluster health: {es_status}") + messages["es"].append(f"Elasticsearch cluster health: {es_status}") # Database checks for name, dbconfig in zip( @@ -74,13 +75,13 @@ def on_get(req, resp): ): db = database_connect(dbconfig=dbconfig, timeout=3, attempt_reconnect=False) if not db: - messages.append( + messages["db"].append( f"Database connection for '{name}' could not be established" ) if messages: resp.status = falcon.HTTP_503 - resp.media = {"status": messages} + resp.media = {"status": "503 Service Unavailable", "dependencies": messages} class StatResource: