Skip to content

Commit

Permalink
Use decouple to read settings from env vars
Browse files Browse the repository at this point in the history
  • Loading branch information
dhruvkb committed Sep 25, 2021
1 parent 2482821 commit 29aa052
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,18 @@
"""
import logging as log
import math
import os
import socket
import time

import boto3
import requests
from decouple import config

from ingestion_server.constants.media_types import MEDIA_TYPES
from ingestion_server.state import register_indexing_job


client = boto3.client("ec2", region_name=os.getenv("AWS_REGION", "us-east-1"))
client = boto3.client("ec2", region_name=config("AWS_REGION", default="us-east-1"))


def schedule_distributed_index(db_conn, target_index):
Expand Down Expand Up @@ -69,7 +69,7 @@ def _prepare_workers():
:return: A list of private URLs pointing to each available indexing worker
"""
environment = os.getenv("ENVIRONMENT", "local")
environment = config("ENVIRONMENT", default="local")
if environment == "local":
return [socket.gethostbyname("indexer-worker")]
instance_filters = [
Expand Down
37 changes: 19 additions & 18 deletions ingestion_server/ingestion_server/indexer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import argparse
import datetime
import logging as log
import os
import sys
import time
import uuid
Expand All @@ -10,6 +9,7 @@
import elasticsearch
import psycopg2
from aws_requests_auth.aws_auth import AWSRequestsAuth
from decouple import config
from elasticsearch import Elasticsearch, NotFoundError, RequestsHttpConnection, helpers
from elasticsearch.exceptions import ConnectionError as ESConnectionError
from elasticsearch_dsl import Search, connections
Expand Down Expand Up @@ -40,29 +40,30 @@
"""

# For AWS IAM access to Elasticsearch
AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID", "")
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY", "")
AWS_ACCESS_KEY_ID = config("AWS_ACCESS_KEY_ID", default="")
AWS_SECRET_ACCESS_KEY = config("AWS_SECRET_ACCESS_KEY", default="")

ELASTICSEARCH_URL = os.getenv("ELASTICSEARCH_URL", "localhost")
ELASTICSEARCH_PORT = int(os.getenv("ELASTICSEARCH_PORT", 9200))
ELASTICSEARCH_URL = config("ELASTICSEARCH_URL", default="localhost")
ELASTICSEARCH_PORT = config("ELASTICSEARCH_PORT", default=9200, cast=int)

AWS_REGION = os.getenv("AWS_REGION", "us-east-1")
AWS_REGION = config("AWS_REGION", "us-east-1")

DATABASE_HOST = os.getenv("DATABASE_HOST", "localhost")
DATABASE_PORT = int(os.getenv("DATABASE_PORT", 5432))
DATABASE_USER = os.getenv("DATABASE_USER", "deploy")
DATABASE_PASSWORD = os.getenv("DATABASE_PASSWORD", "deploy")
DATABASE_NAME = os.getenv("DATABASE_NAME", "openledger")
DATABASE_HOST = config("DATABASE_HOST", default="localhost")
DATABASE_PORT = config("DATABASE_PORT", default=5432, cast=int)
DATABASE_USER = config("DATABASE_USER", default="deploy")
DATABASE_PASSWORD = config("DATABASE_PASSWORD", default="deploy")
DATABASE_NAME = config("DATABASE_NAME", default="openledger")

# The number of database records to load in memory at once.
DB_BUFFER_SIZE = int(os.getenv("DB_BUFFER_SIZE", 100000))
DB_BUFFER_SIZE = config("DB_BUFFER_SIZE", default=100000, cast=int)

SYNCER_POLL_INTERVAL = int(os.getenv("SYNCER_POLL_INTERVAL", 60))
SYNCER_POLL_INTERVAL = config("SYNCER_POLL_INTERVAL", default=60, cast=int)

# A comma separated list of tables in the database table to replicate to
# Elasticsearch. Ex: image,docs
REP_TABLES = os.getenv("COPY_TABLES", "image")
replicate_tables = REP_TABLES.split(",") if "," in REP_TABLES else [REP_TABLES]
REP_TABLES = config(
"COPY_TABLES", default="image", cast=lambda var: [s.strip() for s in var.split(",")]
)

TWELVE_HOURS_SEC = 60 * 60 * 12

Expand Down Expand Up @@ -351,7 +352,7 @@ def go_live(write_index, live_alias):
# because there will only be one node available. In production, there
# are many nodes, and the index should not be promoted until all
# shards have been initialized.
environment = os.getenv("ENVIRONMENT", "local")
environment = config("ENVIRONMENT", default="local")
if environment != "local":
log.info("Waiting for replica shards. . .")
es.cluster.health(index=write_index, wait_for_status="green", timeout="12h")
Expand Down Expand Up @@ -401,7 +402,7 @@ def reindex(self, model_name: str, distributed=None):
suffix = uuid.uuid4().hex
destination_index = f"{model_name}-{suffix}"
if distributed is None:
distributed = os.getenv("ENVIRONMENT", "local") != "local"
distributed = config("ENVIRONMENT", default="local") != "local"
if distributed:
self.es.indices.create(
index=destination_index, body=index_settings(model_name)
Expand Down Expand Up @@ -471,7 +472,7 @@ def pg_chunk_to_es(pg_chunk, columns, origin_table, dest_index):
log.getLogger(TableIndexer.__name__).setLevel(log.INFO)
log.info("Connecting to Elasticsearch")
elasticsearch_client = elasticsearch_connect()
syncer = TableIndexer(elasticsearch_client, replicate_tables)
syncer = TableIndexer(elasticsearch_client, REP_TABLES)
if parsed.reindex:
log.info(f"Reindexing {parsed.reindex}")
syncer.reindex(parsed.reindex)
Expand Down
10 changes: 5 additions & 5 deletions ingestion_server/ingestion_server/indexer_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@
data has been indexed, notify Ingestion Server and stop the instance.
"""
import logging as log
import os
import sys
from multiprocessing import Process, Value

import boto3
import falcon
import requests
from decouple import config
from psycopg2.sql import SQL, Identifier, Literal

from ingestion_server.constants.media_types import MEDIA_TYPES
Expand All @@ -22,9 +22,9 @@

ec2_client = boto3.client(
"ec2",
region_name=os.getenv("AWS_REGION", "us-east-1"),
aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID", None),
aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY", None),
region_name=config("AWS_REGION", default="us-east-1"),
aws_access_key_id=config("AWS_ACCESS_KEY_ID", default=None),
aws_secret_access_key=config("AWS_SECRET_ACCESS_KEY", default=None),
)


Expand Down Expand Up @@ -93,7 +93,7 @@ def _self_destruct():
Stop this EC2 instance once the task is finished.
"""
# Get instance ID from AWS metadata service
if os.getenv("ENVIRONMENT", "local") == "local":
if config("ENVIRONMENT", default="local") == "local":
log.info("Skipping self destruction because worker is in local environment")
return
endpoint = "http://169.254.169.254/latest/meta-data/instance-id"
Expand Down
24 changes: 15 additions & 9 deletions ingestion_server/ingestion_server/ingest.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import datetime
import logging as log
import os

import psycopg2
from decouple import config
from psycopg2.extras import DictCursor
from psycopg2.sql import SQL, Identifier, Literal

Expand Down Expand Up @@ -33,16 +33,22 @@
data in place.
"""

UPSTREAM_DB_HOST = os.getenv("UPSTREAM_DB_HOST", "localhost")
UPSTREAM_DB_PORT = int(os.getenv("UPSTREAM_DB_PORT", 5433))
UPSTREAM_DB_USER = os.getenv("UPSTREAM_DB_USER", "deploy")
UPSTREAM_DB_PASSWORD = os.getenv("UPSTREAM_DB_PASSWORD", "deploy")
UPSTREAM_DB_NAME = os.getenv("UPSTREAM_DB_NAME", "openledger")
UPSTREAM_DB_HOST = config("UPSTREAM_DB_HOST", default="localhost")
UPSTREAM_DB_PORT = config("UPSTREAM_DB_PORT", default=5433, cast=int)
UPSTREAM_DB_USER = config("UPSTREAM_DB_USER", default="deploy")
UPSTREAM_DB_PASSWORD = config("UPSTREAM_DB_PASSWORD", default="deploy")
UPSTREAM_DB_NAME = config("UPSTREAM_DB_NAME", default="openledger")

RELATIVE_UPSTREAM_DB_HOST = os.getenv("RELATIVE_UPSTREAM_DB_HOST", UPSTREAM_DB_HOST)
RELATIVE_UPSTREAM_DB_HOST = config(
"RELATIVE_UPSTREAM_DB_HOST",
default=UPSTREAM_DB_HOST,
)
"""The hostname of the upstream DB from the POV of the downstream DB"""
RELATIVE_UPSTREAM_DB_PORT = int(
os.getenv("RELATIVE_UPSTREAM_DB_PORT", str(UPSTREAM_DB_PORT))

RELATIVE_UPSTREAM_DB_PORT = config(
"RELATIVE_UPSTREAM_DB_PORT",
default=UPSTREAM_DB_PORT,
cast=int,
)
"""The port of the upstream DB from the POV of the downstream DB"""

Expand Down
6 changes: 3 additions & 3 deletions ingestion_server/ingestion_server/state.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import datetime
import enum
import logging as log
import os
import shelve

from decouple import config
from filelock import FileLock


Expand All @@ -18,8 +18,8 @@
"""


lock_path = os.getenv("LOCK_PATH", "lock")
shelf_path = os.getenv("SHELF_PATH", "db")
lock_path = config("LOCK_PATH", default="lock")
shelf_path = config("SHELF_PATH", default="db")


class WorkerStatus(enum.Enum):
Expand Down

0 comments on commit 29aa052

Please sign in to comment.