diff --git a/ingestion_server/ingestion_server/distributed_reindex_scheduler.py b/ingestion_server/ingestion_server/distributed_reindex_scheduler.py index 69d07b3060e..433f69cddda 100644 --- a/ingestion_server/ingestion_server/distributed_reindex_scheduler.py +++ b/ingestion_server/ingestion_server/distributed_reindex_scheduler.py @@ -9,18 +9,18 @@ """ import logging as log import math -import os import socket import time import boto3 import requests +from decouple import config from ingestion_server.constants.media_types import MEDIA_TYPES from ingestion_server.state import register_indexing_job -client = boto3.client("ec2", region_name=os.getenv("AWS_REGION", "us-east-1")) +client = boto3.client("ec2", region_name=config("AWS_REGION", default="us-east-1")) def schedule_distributed_index(db_conn, target_index): @@ -69,7 +69,7 @@ def _prepare_workers(): :return: A list of private URLs pointing to each available indexing worker """ - environment = os.getenv("ENVIRONMENT", "local") + environment = config("ENVIRONMENT", default="local") if environment == "local": return [socket.gethostbyname("indexer-worker")] instance_filters = [ diff --git a/ingestion_server/ingestion_server/indexer.py b/ingestion_server/ingestion_server/indexer.py index dcb8c216597..0d6f06d465f 100644 --- a/ingestion_server/ingestion_server/indexer.py +++ b/ingestion_server/ingestion_server/indexer.py @@ -1,7 +1,6 @@ import argparse import datetime import logging as log -import os import sys import time import uuid @@ -10,6 +9,7 @@ import elasticsearch import psycopg2 from aws_requests_auth.aws_auth import AWSRequestsAuth +from decouple import config from elasticsearch import Elasticsearch, NotFoundError, RequestsHttpConnection, helpers from elasticsearch.exceptions import ConnectionError as ESConnectionError from elasticsearch_dsl import Search, connections @@ -40,29 +40,30 @@ """ # For AWS IAM access to Elasticsearch -AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID", "") -AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY", "") +AWS_ACCESS_KEY_ID = config("AWS_ACCESS_KEY_ID", default="") +AWS_SECRET_ACCESS_KEY = config("AWS_SECRET_ACCESS_KEY", default="") -ELASTICSEARCH_URL = os.getenv("ELASTICSEARCH_URL", "localhost") -ELASTICSEARCH_PORT = int(os.getenv("ELASTICSEARCH_PORT", 9200)) +ELASTICSEARCH_URL = config("ELASTICSEARCH_URL", default="localhost") +ELASTICSEARCH_PORT = config("ELASTICSEARCH_PORT", default=9200, cast=int) -AWS_REGION = os.getenv("AWS_REGION", "us-east-1") +AWS_REGION = config("AWS_REGION", "us-east-1") -DATABASE_HOST = os.getenv("DATABASE_HOST", "localhost") -DATABASE_PORT = int(os.getenv("DATABASE_PORT", 5432)) -DATABASE_USER = os.getenv("DATABASE_USER", "deploy") -DATABASE_PASSWORD = os.getenv("DATABASE_PASSWORD", "deploy") -DATABASE_NAME = os.getenv("DATABASE_NAME", "openledger") +DATABASE_HOST = config("DATABASE_HOST", default="localhost") +DATABASE_PORT = config("DATABASE_PORT", default=5432, cast=int) +DATABASE_USER = config("DATABASE_USER", default="deploy") +DATABASE_PASSWORD = config("DATABASE_PASSWORD", default="deploy") +DATABASE_NAME = config("DATABASE_NAME", default="openledger") # The number of database records to load in memory at once. -DB_BUFFER_SIZE = int(os.getenv("DB_BUFFER_SIZE", 100000)) +DB_BUFFER_SIZE = config("DB_BUFFER_SIZE", default=100000, cast=int) -SYNCER_POLL_INTERVAL = int(os.getenv("SYNCER_POLL_INTERVAL", 60)) +SYNCER_POLL_INTERVAL = config("SYNCER_POLL_INTERVAL", default=60, cast=int) # A comma separated list of tables in the database table to replicate to # Elasticsearch. Ex: image,docs -REP_TABLES = os.getenv("COPY_TABLES", "image") -replicate_tables = REP_TABLES.split(",") if "," in REP_TABLES else [REP_TABLES] +REP_TABLES = config( + "COPY_TABLES", default="image", cast=lambda var: [s.strip() for s in var.split(",")] +) TWELVE_HOURS_SEC = 60 * 60 * 12 @@ -351,7 +352,7 @@ def go_live(write_index, live_alias): # because there will only be one node available. In production, there # are many nodes, and the index should not be promoted until all # shards have been initialized. - environment = os.getenv("ENVIRONMENT", "local") + environment = config("ENVIRONMENT", default="local") if environment != "local": log.info("Waiting for replica shards. . .") es.cluster.health(index=write_index, wait_for_status="green", timeout="12h") @@ -401,7 +402,7 @@ def reindex(self, model_name: str, distributed=None): suffix = uuid.uuid4().hex destination_index = f"{model_name}-{suffix}" if distributed is None: - distributed = os.getenv("ENVIRONMENT", "local") != "local" + distributed = config("ENVIRONMENT", default="local") != "local" if distributed: self.es.indices.create( index=destination_index, body=index_settings(model_name) @@ -471,7 +472,7 @@ def pg_chunk_to_es(pg_chunk, columns, origin_table, dest_index): log.getLogger(TableIndexer.__name__).setLevel(log.INFO) log.info("Connecting to Elasticsearch") elasticsearch_client = elasticsearch_connect() - syncer = TableIndexer(elasticsearch_client, replicate_tables) + syncer = TableIndexer(elasticsearch_client, REP_TABLES) if parsed.reindex: log.info(f"Reindexing {parsed.reindex}") syncer.reindex(parsed.reindex) diff --git a/ingestion_server/ingestion_server/indexer_worker.py b/ingestion_server/ingestion_server/indexer_worker.py index 09dbeb4985e..967aff4f77d 100644 --- a/ingestion_server/ingestion_server/indexer_worker.py +++ b/ingestion_server/ingestion_server/indexer_worker.py @@ -6,13 +6,13 @@ data has been indexed, notify Ingestion Server and stop the instance. """ import logging as log -import os import sys from multiprocessing import Process, Value import boto3 import falcon import requests +from decouple import config from psycopg2.sql import SQL, Identifier, Literal from ingestion_server.constants.media_types import MEDIA_TYPES @@ -22,9 +22,9 @@ ec2_client = boto3.client( "ec2", - region_name=os.getenv("AWS_REGION", "us-east-1"), - aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID", None), - aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY", None), + region_name=config("AWS_REGION", default="us-east-1"), + aws_access_key_id=config("AWS_ACCESS_KEY_ID", default=None), + aws_secret_access_key=config("AWS_SECRET_ACCESS_KEY", default=None), ) @@ -93,7 +93,7 @@ def _self_destruct(): Stop this EC2 instance once the task is finished. """ # Get instance ID from AWS metadata service - if os.getenv("ENVIRONMENT", "local") == "local": + if config("ENVIRONMENT", default="local") == "local": log.info("Skipping self destruction because worker is in local environment") return endpoint = "http://169.254.169.254/latest/meta-data/instance-id" diff --git a/ingestion_server/ingestion_server/ingest.py b/ingestion_server/ingestion_server/ingest.py index ba00286c6ef..69b6c1574a0 100644 --- a/ingestion_server/ingestion_server/ingest.py +++ b/ingestion_server/ingestion_server/ingest.py @@ -1,8 +1,8 @@ import datetime import logging as log -import os import psycopg2 +from decouple import config from psycopg2.extras import DictCursor from psycopg2.sql import SQL, Identifier, Literal @@ -33,16 +33,22 @@ data in place. """ -UPSTREAM_DB_HOST = os.getenv("UPSTREAM_DB_HOST", "localhost") -UPSTREAM_DB_PORT = int(os.getenv("UPSTREAM_DB_PORT", 5433)) -UPSTREAM_DB_USER = os.getenv("UPSTREAM_DB_USER", "deploy") -UPSTREAM_DB_PASSWORD = os.getenv("UPSTREAM_DB_PASSWORD", "deploy") -UPSTREAM_DB_NAME = os.getenv("UPSTREAM_DB_NAME", "openledger") +UPSTREAM_DB_HOST = config("UPSTREAM_DB_HOST", default="localhost") +UPSTREAM_DB_PORT = config("UPSTREAM_DB_PORT", default=5433, cast=int) +UPSTREAM_DB_USER = config("UPSTREAM_DB_USER", default="deploy") +UPSTREAM_DB_PASSWORD = config("UPSTREAM_DB_PASSWORD", default="deploy") +UPSTREAM_DB_NAME = config("UPSTREAM_DB_NAME", default="openledger") -RELATIVE_UPSTREAM_DB_HOST = os.getenv("RELATIVE_UPSTREAM_DB_HOST", UPSTREAM_DB_HOST) +RELATIVE_UPSTREAM_DB_HOST = config( + "RELATIVE_UPSTREAM_DB_HOST", + default=UPSTREAM_DB_HOST, +) """The hostname of the upstream DB from the POV of the downstream DB""" -RELATIVE_UPSTREAM_DB_PORT = int( - os.getenv("RELATIVE_UPSTREAM_DB_PORT", str(UPSTREAM_DB_PORT)) + +RELATIVE_UPSTREAM_DB_PORT = config( + "RELATIVE_UPSTREAM_DB_PORT", + default=UPSTREAM_DB_PORT, + cast=int, ) """The port of the upstream DB from the POV of the downstream DB""" diff --git a/ingestion_server/ingestion_server/state.py b/ingestion_server/ingestion_server/state.py index f98105f7173..8d7b25697a5 100644 --- a/ingestion_server/ingestion_server/state.py +++ b/ingestion_server/ingestion_server/state.py @@ -1,9 +1,9 @@ import datetime import enum import logging as log -import os import shelve +from decouple import config from filelock import FileLock @@ -18,8 +18,8 @@ """ -lock_path = os.getenv("LOCK_PATH", "lock") -shelf_path = os.getenv("SHELF_PATH", "db") +lock_path = config("LOCK_PATH", default="lock") +shelf_path = config("SHELF_PATH", default="db") class WorkerStatus(enum.Enum):