From de1254848d62cfef76fb9ff3c391e11ff0c4ef74 Mon Sep 17 00:00:00 2001 From: vishreddy01 Date: Fri, 12 Apr 2024 15:54:32 -0700 Subject: [PATCH 01/12] pushing pg2pg ods replication scripts --- .github/workflows/docker-pg2pg.yaml | 65 +++++ shared/ods_replication_pg2pg/Dockerfile | 17 ++ .../data_replication_pg2pg.py | 224 ++++++++++++++++++ 3 files changed, 306 insertions(+) create mode 100644 .github/workflows/docker-pg2pg.yaml create mode 100644 shared/ods_replication_pg2pg/Dockerfile create mode 100644 shared/ods_replication_pg2pg/data_replication_pg2pg.py diff --git a/.github/workflows/docker-pg2pg.yaml b/.github/workflows/docker-pg2pg.yaml new file mode 100644 index 0000000..ebf9b71 --- /dev/null +++ b/.github/workflows/docker-pg2pg.yaml @@ -0,0 +1,65 @@ +name: Push to GHCR + +on: + push: + branches: [ "ods-replication-pg2pg" ] + +env: + # DF-NOTE: pull ghcr.io/bcgov/nr-dap-ods-trino:main + REGISTRY: ghcr.io + DOCKERFILE_PATH: shared/ods_replication_pg2pg + IMAGE_NAME: ${{ github.repository }}-ods-pg2pg + +jobs: + build: + runs-on: ubuntu-latest + permissions: + contents: read + packages: write + id-token: write + + steps: + - name: Checkout repository + uses: actions/checkout@v3 + + - name: Install cosign + if: github.event_name != 'pull_request' + uses: sigstore/cosign-installer@v3.3.0 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@f95db51fddba0c2d1ec667646a06c2ce06100226 # v3.0.0 + + - name: Log into registry ${{ env.REGISTRY }} + if: github.event_name != 'pull_request' + uses: docker/login-action@343f7c4344506bcbf9b4de18042ae17996df046d # v3.0.0 + with: + registry: ${{ env.REGISTRY }} + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Extract Docker metadata + id: meta + uses: docker/metadata-action@96383f45573cb7f253c731d3b3ab81c87ef81934 # v5.0.0 + with: + images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} + + - name: Build and push Docker image + id: build-and-push + uses: docker/build-push-action@0565240e2d4ab88bba5387d719585280857ece09 # v5.0.0 + with: + # DF-NOTE: to help the action find the Dockerfile to build from + context: ${{ env.DOCKERFILE_PATH }}/ + push: ${{ github.event_name != 'pull_request' }} + tags: ${{ steps.meta.outputs.tags }} + labels: ${{ steps.meta.outputs.labels }} + cache-from: type=gha + cache-to: type=gha,mode=max + + #- name: Sign the published Docker image + #if: ${{ github.event_name != 'pull_request' }} + #env: + # https://docs.github.com/en/actions/security-guides/security-hardening-for-github-actions#using-an-intermediate-environment-variable + #TAGS: ${{ steps.meta.outputs.tags }} + #DIGEST: ${{ steps.build-and-push.outputs.digest }} + + #run: echo "${TAGS}" | xargs -I {} cosign sign --yes {}@${DIGEST} diff --git a/shared/ods_replication_pg2pg/Dockerfile b/shared/ods_replication_pg2pg/Dockerfile new file mode 100644 index 0000000..aba69ca --- /dev/null +++ b/shared/ods_replication_pg2pg/Dockerfile @@ -0,0 +1,17 @@ +FROM python:3.11.4-slim-buster + +WORKDIR /app + +# PostgreSQL library + +RUN apt-get update \ + && apt-get -y install libpq-dev gcc \ + && pip install psycopg2 + +ADD *.py . + +#COPY requirements.txt requirements.txt + +#RUN pip3 install -r requirements.txt + +CMD ["python3", "./data_replication_pg2pg.py"] \ No newline at end of file diff --git a/shared/ods_replication_pg2pg/data_replication_pg2pg.py b/shared/ods_replication_pg2pg/data_replication_pg2pg.py new file mode 100644 index 0000000..db61dec --- /dev/null +++ b/shared/ods_replication_pg2pg/data_replication_pg2pg.py @@ -0,0 +1,224 @@ +#!/usr/bin/env python +# coding: utf-8 + +# In[1]: Imports +# refer if block at line 38, some imports are conditional +import psycopg2 +import psycopg2.pool +import psycopg2.extras +from psycopg2.extras import execute_batch +import configparser +import time +import json +import concurrent.futures +from datetime import datetime +import sys +import os +import argparse + + +start = time.time() + +# In[3]: Retrieve Oracle database configuration +src_postgres_username = os.environ['DB_USERNAME'] +src_postgres_password = os.environ['DB_PASSWORD'] +src_postgres_host = os.environ['DB_HOST'] +src_postgres_port = os.environ['DB_PORT'] +src_postgres_database = os.environ['DATABASE'] +# In[4]: Retrieve Postgres database configuration +postgres_username = os.environ['ODS_USERNAME'] +postgres_password = os.environ['ODS_PASSWORD'] +postgres_host = os.environ['ODS_HOST'] +postgres_port = os.environ['ODS_PORT'] +postgres_database = os.environ['ODS_DATABASE'] +# In[5]: Script parameters +mstr_schema = os.environ['MSTR_SCHEMA'] +app_name = os.environ['APP_NAME'] +concurrent_tasks = int(os.environ['CONCUR_TASKS']) +audit_table = 'audit_batch_status' +current_date = datetime.now().strftime('%Y-%m-%d') + +#concurrent_tasks = int(concurrent_tasks) +#In[5]: Concurrent tasks - number of tables to be replicated in parallel +#concurrent_tasks = 5 + +# In[6]: Set up Oracle connection pool +# In[7]: Setup Postgres Pool +SrcPgresPool = psycopg2.pool.ThreadedConnectionPool( + minconn = concurrent_tasks, maxconn = concurrent_tasks,host=src_postgres_host, port=src_postgres_port, dbname=src_postgres_database, user=src_postgres_username, password=src_postgres_password +) +print('Source Postgres Connection Successful') + +# In[7]: Setup Postgres Pool +PgresPool = psycopg2.pool.ThreadedConnectionPool( + minconn = concurrent_tasks, maxconn = concurrent_tasks,host=postgres_host, port=postgres_port, dbname=postgres_database, user=postgres_username, password=postgres_password +) +print('Target Postgres Connection Successful') + +def del_audit_entries_rerun(current_date): + postgres_connection = PgresPool.getconn() + postgres_cursor = postgres_connection.cursor() + del_sql = f""" + DELETE FROM {mstr_schema}.{audit_table} c + where application_name='{app_name}' and batch_run_date='{current_date}' + """ + postgres_cursor.execute(del_sql) + postgres_connection.commit() + postgres_cursor.close() + PgresPool.putconn(postgres_connection) + return print(del_sql) + +# Function to insert the audit batch status entry +def audit_batch_status_insert(table_name,status): + postgres_connection = PgresPool.getconn() + postgres_cursor = postgres_connection.cursor() + try: + audit_batch_status_query = f"""INSERT INTO {mstr_schema}.{audit_table} VALUES ('{table_name}','{app_name}','replication','{status}',current_date)""" + print(audit_batch_status_query) + postgres_cursor.execute(audit_batch_status_query) + postgres_connection.commit() + print(f"Record inserted into audit batch status table") + return None + except Exception as e: + print(f"Error inserting record into to audit batch status table: {str(e)}") + return None + finally: + # Return the connection to the pool + if postgres_connection: + postgres_cursor.close() + PgresPool.putconn(postgres_connection) + +# In[8]: Function to get active rows from master table +def get_active_tables(mstr_schema,app_name): + postgres_connection = PgresPool.getconn() + postgres_cursor = postgres_connection.cursor() + list_sql = f""" + SELECT application_name,source_schema_name,source_table_name,target_schema_name,target_table_name,truncate_flag,cdc_flag,full_inc_flag,cdc_column,replication_order,customsql_ind,customsql_query + from {mstr_schema}.cdc_master_table_list c + where active_ind = 'Y' and application_name='{app_name}' + order by replication_order, source_table_name + """ + with postgres_connection.cursor() as curs: + curs.execute(list_sql) + rows = curs.fetchall() + postgres_connection.commit() + postgres_cursor.close() + PgresPool.putconn(postgres_connection) + return rows + +# In[9]: Function to extract data from Oracle +def extract_from_srcpg(table_name,source_schema,customsql_ind,customsql_query): + # Acquire a connection from the pool + srcpostgres_connection = SrcPgresPool.getconn() + srcpostgres_cursor = srcpostgres_connection.cursor() + try: + if customsql_ind == "Y": + # Use placeholders in the query and bind the table name as a parameter + sql_query=customsql_query + print(sql_query) + srcpostgres_cursor.execute(sql_query) + rows = srcpostgres_cursor.fetchall() + #OrcPool.release(oracle_connection) + return rows + else: + sql_query = f'SELECT * FROM {source_schema}.{table_name}' + print(sql_query) + srcpostgres_cursor.execute(sql_query) + rows = srcpostgres_cursor.fetchall() + #OrcPool.release(oracle_connection) + return rows + + except Exception as e: + audit_batch_status_insert(table_name,'failed') + print(f"Error extracting data from SrcPostgres: {str(e)}") + #OrcPool.release(oracle_connection) #Temporary change + return [] + + finally: + # Return the connection to the pool + if srcpostgres_connection: + srcpostgres_cursor.close() + SrcPgresPool.putconn(srcpostgres_connection) +# In[10]: Function to load data into Target PostgreSQL using data from Source Oracle +def load_into_postgres(table_name, data,target_schema): + postgres_connection = PgresPool.getconn() + postgres_cursor = postgres_connection.cursor() + try: + # Delete existing data in the target table + delete_query = f'TRUNCATE TABLE {target_schema}.{table_name}' + postgres_cursor.execute(delete_query) + + # Build the INSERT query with placeholders + insert_query = f'INSERT INTO {target_schema}.{table_name} VALUES ({", ".join(["%s"] * len(data[0]))})' + #insert_query = f'INSERT INTO {target_schema}.{table_name} VALUES %s' + + # Use execute_batch for efficient batch insert + with postgres_connection.cursor() as cursor: + # Prepare the data as a list of tuples + data_to_insert = [(tuple(row)) for row in data] + execute_batch(cursor, insert_query, data_to_insert) + postgres_connection.commit() + # Insert record to audit batch table + audit_batch_status_insert(table_name,'success') + + + except Exception as e: + print(f"Error loading data into PostgreSQL: {str(e)}") + audit_batch_status_insert(table_name,'failed') + finally: + # Return the connection to the pool + if postgres_connection: + postgres_cursor.close() + PgresPool.putconn(postgres_connection) + +# In[11]: Function to call both extract and load functions +def load_data_from_src_tgt(table_name,source_schema,target_schema,customsql_ind,customsql_query): + # Extract data from Oracle + print(f'Source: Thread {table_name} started at ' + datetime.now().strftime("%H:%M:%S")) + srcpg_data = extract_from_srcpg(table_name,source_schema,customsql_ind,customsql_query) # Ensure table name is in uppercase + print(f'Source: Extraction for {table_name} completed at ' + datetime.now().strftime("%H:%M:%S")) + + if srcpg_data: + # Load data into PostgreSQL + load_into_postgres(table_name, srcpg_data, target_schema) + print(f"Target: Data loaded into table: {table_name}") + print(f'Target: Thread {table_name} ended at ' + datetime.now().strftime("%H:%M:%S")) + +# In[12]: Initializing concurrency +if __name__ == '__main__': + # Main ETL process + active_tables_rows =get_active_tables(mstr_schema,app_name) + #print(active_tables_rows) + tables_to_extract = [(row[2],row[1],row[3],row[10],row[11]) for row in active_tables_rows] + + print(f"tables to extract are {tables_to_extract}") + print(f'No of concurrent tasks:{concurrent_tasks}') + #Delete audit entries for rerun on same day + del_audit_entries_rerun(current_date) + # Using ThreadPoolExecutor to run tasks concurrently + with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_tasks) as executor: + # Submit tasks to the executor + future_to_table = {executor.submit(load_data_from_src_tgt, table[0],table[1],table[2],table[3],table[4]): table for table in tables_to_extract} + + # Wait for all tasks to complete + concurrent.futures.wait(future_to_table) + + # Print results + for future in future_to_table: + table_name = future_to_table[future] + try: + # Get the result of the task, if any + future.result() + except Exception as e: + # Handle exceptions that occurred during the task + print(f"Error replicating {table_name}: {e}") + audit_batch_status_insert(table_name[0],'failed') + + # record end time + end = time.time() + SrcPgresPool.closeall() + PgresPool.closeall() + + print("ETL process completed successfully.") + print("The time of execution of the program is:", (end - start) , "secs") + From 172e2124d7604b6a078d115067cda989386081c3 Mon Sep 17 00:00:00 2001 From: vishreddy01 Date: Fri, 12 Apr 2024 15:56:51 -0700 Subject: [PATCH 02/12] changed image name --- .github/workflows/docker-pg2pg.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/docker-pg2pg.yaml b/.github/workflows/docker-pg2pg.yaml index ebf9b71..141f32b 100644 --- a/.github/workflows/docker-pg2pg.yaml +++ b/.github/workflows/docker-pg2pg.yaml @@ -8,7 +8,7 @@ env: # DF-NOTE: pull ghcr.io/bcgov/nr-dap-ods-trino:main REGISTRY: ghcr.io DOCKERFILE_PATH: shared/ods_replication_pg2pg - IMAGE_NAME: ${{ github.repository }}-ods-pg2pg + IMAGE_NAME: ${{ github.repository }}-pg2pg jobs: build: From 3887e5b6e9b190e2d288324f7f7960e2d0034c6a Mon Sep 17 00:00:00 2001 From: vishreddy01 Date: Fri, 12 Apr 2024 16:06:24 -0700 Subject: [PATCH 03/12] Added requirements.txt --- shared/ods_replication_pg2pg/Dockerfile | 4 ++-- shared/ods_replication_pg2pg/requirements.txt | 12 ++++++++++++ 2 files changed, 14 insertions(+), 2 deletions(-) create mode 100644 shared/ods_replication_pg2pg/requirements.txt diff --git a/shared/ods_replication_pg2pg/Dockerfile b/shared/ods_replication_pg2pg/Dockerfile index aba69ca..3daf395 100644 --- a/shared/ods_replication_pg2pg/Dockerfile +++ b/shared/ods_replication_pg2pg/Dockerfile @@ -10,8 +10,8 @@ RUN apt-get update \ ADD *.py . -#COPY requirements.txt requirements.txt +COPY requirements.txt requirements.txt -#RUN pip3 install -r requirements.txt +RUN pip3 install -r requirements.txt CMD ["python3", "./data_replication_pg2pg.py"] \ No newline at end of file diff --git a/shared/ods_replication_pg2pg/requirements.txt b/shared/ods_replication_pg2pg/requirements.txt new file mode 100644 index 0000000..9400b42 --- /dev/null +++ b/shared/ods_replication_pg2pg/requirements.txt @@ -0,0 +1,12 @@ +oracledb==1.3.1 +psycopg2==2.9.6 +pandas==2.0.2 +openpyxl==3.1.2 +configparser==6.0.0 +dbt-postgres==1.6.4 +PyYAML==6.0 +pyodbc==4.0.39 +python-dotenv==1.0.0 +boto3==1.28.10 +requests==2.31.0 +XlsxWriter==3.1.2 \ No newline at end of file From 24abbddd50269e63ceec637c8bd69b71f872f103 Mon Sep 17 00:00:00 2001 From: vishreddy01 Date: Fri, 12 Apr 2024 16:09:18 -0700 Subject: [PATCH 04/12] chnaged github action push branch to main --- .github/workflows/docker-pg2pg.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/docker-pg2pg.yaml b/.github/workflows/docker-pg2pg.yaml index 141f32b..97a62c1 100644 --- a/.github/workflows/docker-pg2pg.yaml +++ b/.github/workflows/docker-pg2pg.yaml @@ -2,7 +2,7 @@ name: Push to GHCR on: push: - branches: [ "ods-replication-pg2pg" ] + branches: [ "main" ] env: # DF-NOTE: pull ghcr.io/bcgov/nr-dap-ods-trino:main From dea41c6f618530597f29f5363f544d9d3f4d582b Mon Sep 17 00:00:00 2001 From: vishreddy01 Date: Tue, 16 Apr 2024 14:16:14 -0700 Subject: [PATCH 05/12] changed the github action code --- .github/workflows/docker-pg2pg.yaml | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/.github/workflows/docker-pg2pg.yaml b/.github/workflows/docker-pg2pg.yaml index 97a62c1..1c2d135 100644 --- a/.github/workflows/docker-pg2pg.yaml +++ b/.github/workflows/docker-pg2pg.yaml @@ -2,7 +2,7 @@ name: Push to GHCR on: push: - branches: [ "main" ] + branches: [ "ods-replication-pg2pg" ] env: # DF-NOTE: pull ghcr.io/bcgov/nr-dap-ods-trino:main @@ -55,11 +55,11 @@ jobs: cache-from: type=gha cache-to: type=gha,mode=max - #- name: Sign the published Docker image - #if: ${{ github.event_name != 'pull_request' }} - #env: - # https://docs.github.com/en/actions/security-guides/security-hardening-for-github-actions#using-an-intermediate-environment-variable - #TAGS: ${{ steps.meta.outputs.tags }} - #DIGEST: ${{ steps.build-and-push.outputs.digest }} + - name: Sign the published Docker image + if: ${{ github.event_name != 'pull_request' }} + env: + https://docs.github.com/en/actions/security-guides/security-hardening-for-github-actions#using-an-intermediate-environment-variable + TAGS: ${{ steps.meta.outputs.tags }} + DIGEST: ${{ steps.build-and-push.outputs.digest }} - #run: echo "${TAGS}" | xargs -I {} cosign sign --yes {}@${DIGEST} + run: echo "${TAGS}" | xargs -I {} cosign sign --yes {}@${DIGEST} From 75e12b0f8768317d2dbb1be28cd6c33ec628eb3d Mon Sep 17 00:00:00 2001 From: vishreddy01 Date: Tue, 16 Apr 2024 14:21:40 -0700 Subject: [PATCH 06/12] Corrected the github action code --- .github/workflows/docker-pg2pg.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/docker-pg2pg.yaml b/.github/workflows/docker-pg2pg.yaml index 1c2d135..9fe6759 100644 --- a/.github/workflows/docker-pg2pg.yaml +++ b/.github/workflows/docker-pg2pg.yaml @@ -2,7 +2,7 @@ name: Push to GHCR on: push: - branches: [ "ods-replication-pg2pg" ] + branches: [ "main" ] env: # DF-NOTE: pull ghcr.io/bcgov/nr-dap-ods-trino:main @@ -58,7 +58,7 @@ jobs: - name: Sign the published Docker image if: ${{ github.event_name != 'pull_request' }} env: - https://docs.github.com/en/actions/security-guides/security-hardening-for-github-actions#using-an-intermediate-environment-variable + # https://docs.github.com/en/actions/security-guides/security-hardening-for-github-actions#using-an-intermediate-environment-variable TAGS: ${{ steps.meta.outputs.tags }} DIGEST: ${{ steps.build-and-push.outputs.digest }} From 7b9041d5fd626d830fee0a8916cf1d704a68d5c4 Mon Sep 17 00:00:00 2001 From: vishreddy01 Date: Wed, 17 Apr 2024 09:59:41 -0700 Subject: [PATCH 07/12] updated the exit codes --- .../data_replication_pg2pg.py | 53 ++++++++++++++----- 1 file changed, 40 insertions(+), 13 deletions(-) diff --git a/shared/ods_replication_pg2pg/data_replication_pg2pg.py b/shared/ods_replication_pg2pg/data_replication_pg2pg.py index db61dec..130972b 100644 --- a/shared/ods_replication_pg2pg/data_replication_pg2pg.py +++ b/shared/ods_replication_pg2pg/data_replication_pg2pg.py @@ -26,11 +26,11 @@ src_postgres_port = os.environ['DB_PORT'] src_postgres_database = os.environ['DATABASE'] # In[4]: Retrieve Postgres database configuration -postgres_username = os.environ['ODS_USERNAME'] -postgres_password = os.environ['ODS_PASSWORD'] -postgres_host = os.environ['ODS_HOST'] -postgres_port = os.environ['ODS_PORT'] -postgres_database = os.environ['ODS_DATABASE'] +postgres_username = os.environ['TGT_USERNAME'] +postgres_password = os.environ['TGT_PASSWORD'] +postgres_host = os.environ['TGT_HOST'] +postgres_port = os.environ['TGT_PORT'] +postgres_database = os.environ['TGT_DATABASE'] # In[5]: Script parameters mstr_schema = os.environ['MSTR_SCHEMA'] app_name = os.environ['APP_NAME'] @@ -44,16 +44,42 @@ # In[6]: Set up Oracle connection pool # In[7]: Setup Postgres Pool -SrcPgresPool = psycopg2.pool.ThreadedConnectionPool( - minconn = concurrent_tasks, maxconn = concurrent_tasks,host=src_postgres_host, port=src_postgres_port, dbname=src_postgres_database, user=src_postgres_username, password=src_postgres_password -) -print('Source Postgres Connection Successful') + +try: + SrcPgresPool = psycopg2.pool.ThreadedConnectionPool( + minconn=concurrent_tasks, + maxconn=concurrent_tasks, + host=src_postgres_host, + port=src_postgres_port, + dbname=src_postgres_database, + user=src_postgres_username, + password=src_postgres_password + ) + print('Source Postgres Connection Successful') + # Exit with code 0 (success) if connection successful +except Exception as e: + print(f'Error connecting to Source PostgreSQL: {e}') + # Exit with code 1 (failure) if connection unsuccessful + sys.exit(1) + # In[7]: Setup Postgres Pool -PgresPool = psycopg2.pool.ThreadedConnectionPool( - minconn = concurrent_tasks, maxconn = concurrent_tasks,host=postgres_host, port=postgres_port, dbname=postgres_database, user=postgres_username, password=postgres_password -) -print('Target Postgres Connection Successful') +try: + PgresPool = psycopg2.pool.ThreadedConnectionPool( + minconn=concurrent_tasks, + maxconn=concurrent_tasks, + host=postgres_host, + port=postgres_port, + dbname=postgres_database, + user=postgres_username, + password=postgres_password + ) + print('Target Postgres Connection Successful') + +except psycopg2.OperationalError as e: + print(f'Error connecting to Target PostgreSQL: {e}') + # Exit with code 1 (failure) if connection unsuccessful + sys.exit(2) def del_audit_entries_rerun(current_date): postgres_connection = PgresPool.getconn() @@ -222,3 +248,4 @@ def load_data_from_src_tgt(table_name,source_schema,target_schema,customsql_ind, print("ETL process completed successfully.") print("The time of execution of the program is:", (end - start) , "secs") +sys.exit(0) \ No newline at end of file From 126b2d83430c03af5e4abdd7f5d14baa3c8ad879 Mon Sep 17 00:00:00 2001 From: vishreddy01 Date: Wed, 17 Apr 2024 11:15:08 -0700 Subject: [PATCH 08/12] changed github action to push to ods-replication-pg2pg --- .github/workflows/docker-pg2pg.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/docker-pg2pg.yaml b/.github/workflows/docker-pg2pg.yaml index 9fe6759..2439c94 100644 --- a/.github/workflows/docker-pg2pg.yaml +++ b/.github/workflows/docker-pg2pg.yaml @@ -2,7 +2,7 @@ name: Push to GHCR on: push: - branches: [ "main" ] + branches: [ "ods-replication-pg2pg" ] env: # DF-NOTE: pull ghcr.io/bcgov/nr-dap-ods-trino:main From 4bcaf1950295e904cbdd22df671360e6daf6a7aa Mon Sep 17 00:00:00 2001 From: vishreddy01 Date: Wed, 17 Apr 2024 16:13:48 -0700 Subject: [PATCH 09/12] changed the target postgres variables --- shared/ods_replication_pg2pg/data_replication_pg2pg.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/shared/ods_replication_pg2pg/data_replication_pg2pg.py b/shared/ods_replication_pg2pg/data_replication_pg2pg.py index 130972b..6b01dc6 100644 --- a/shared/ods_replication_pg2pg/data_replication_pg2pg.py +++ b/shared/ods_replication_pg2pg/data_replication_pg2pg.py @@ -26,11 +26,11 @@ src_postgres_port = os.environ['DB_PORT'] src_postgres_database = os.environ['DATABASE'] # In[4]: Retrieve Postgres database configuration -postgres_username = os.environ['TGT_USERNAME'] -postgres_password = os.environ['TGT_PASSWORD'] -postgres_host = os.environ['TGT_HOST'] -postgres_port = os.environ['TGT_PORT'] -postgres_database = os.environ['TGT_DATABASE'] +postgres_username = os.environ['ODS_USERNAME'] +postgres_password = os.environ['ODS_PASSWORD'] +postgres_host = os.environ['ODS_HOST'] +postgres_port = os.environ['ODS_PORT'] +postgres_database = os.environ['ODS_DATABASE'] # In[5]: Script parameters mstr_schema = os.environ['MSTR_SCHEMA'] app_name = os.environ['APP_NAME'] From d04efa54fa21128ccd0433d4ff6860df055f4664 Mon Sep 17 00:00:00 2001 From: vishreddy01 Date: Wed, 17 Apr 2024 16:32:51 -0700 Subject: [PATCH 10/12] added the exit codes for pg2pg script --- .../data_replication_pg2pg.py | 44 ++++++++++++------- 1 file changed, 28 insertions(+), 16 deletions(-) diff --git a/shared/ods_replication_pg2pg/data_replication_pg2pg.py b/shared/ods_replication_pg2pg/data_replication_pg2pg.py index 6b01dc6..c2e03f8 100644 --- a/shared/ods_replication_pg2pg/data_replication_pg2pg.py +++ b/shared/ods_replication_pg2pg/data_replication_pg2pg.py @@ -84,15 +84,20 @@ def del_audit_entries_rerun(current_date): postgres_connection = PgresPool.getconn() postgres_cursor = postgres_connection.cursor() - del_sql = f""" - DELETE FROM {mstr_schema}.{audit_table} c - where application_name='{app_name}' and batch_run_date='{current_date}' - """ - postgres_cursor.execute(del_sql) - postgres_connection.commit() - postgres_cursor.close() - PgresPool.putconn(postgres_connection) - return print(del_sql) + try: + del_sql = f""" + DELETE FROM {mstr_schema}.{audit_table} c + where application_name='{app_name}' and batch_run_date='{current_date}' + """ + postgres_cursor.execute(del_sql) + postgres_connection.commit() + postgres_cursor.close() + PgresPool.putconn(postgres_connection) + print(del_sql) + return None + except Exception as e: + print(f"Failed to delete audit entries for application name {app_name} and date '{current_date}': {str(e)}") + sys.exit(3) # Function to insert the audit batch status entry def audit_batch_status_insert(table_name,status): @@ -107,6 +112,7 @@ def audit_batch_status_insert(table_name,status): return None except Exception as e: print(f"Error inserting record into to audit batch status table: {str(e)}") + sys.exit(4) return None finally: # Return the connection to the pool @@ -124,13 +130,19 @@ def get_active_tables(mstr_schema,app_name): where active_ind = 'Y' and application_name='{app_name}' order by replication_order, source_table_name """ - with postgres_connection.cursor() as curs: - curs.execute(list_sql) - rows = curs.fetchall() - postgres_connection.commit() - postgres_cursor.close() - PgresPool.putconn(postgres_connection) - return rows + try: + with postgres_connection.cursor() as curs: + curs.execute(list_sql) + rows = curs.fetchall() + postgres_connection.commit() + postgres_cursor.close() + PgresPool.putconn(postgres_connection) + return rows + except Exception as e: + print(f"Error selecting record from cdc_master_table_list table: {str(e)}") + sys.exit(5) + return None + # In[9]: Function to extract data from Oracle def extract_from_srcpg(table_name,source_schema,customsql_ind,customsql_query): From 5ef9afdde096931ec53d96151a9a2052b7aefde7 Mon Sep 17 00:00:00 2001 From: vishreddy01 Date: Thu, 18 Apr 2024 10:54:18 -0700 Subject: [PATCH 11/12] change the exit codes for the pg2pg script --- .../data_replication_pg2pg.py | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/shared/ods_replication_pg2pg/data_replication_pg2pg.py b/shared/ods_replication_pg2pg/data_replication_pg2pg.py index c2e03f8..f315e39 100644 --- a/shared/ods_replication_pg2pg/data_replication_pg2pg.py +++ b/shared/ods_replication_pg2pg/data_replication_pg2pg.py @@ -222,6 +222,30 @@ def load_data_from_src_tgt(table_name,source_schema,target_schema,customsql_ind, print(f"Target: Data loaded into table: {table_name}") print(f'Target: Thread {table_name} ended at ' + datetime.now().strftime("%H:%M:%S")) +def check_failed_tables(mstr_schema,app_name,current_date): + postgres_connection = PgresPool.getconn() + postgres_cursor = postgres_connection.cursor() + list_sql = f""" + SELECT object_name + from {mstr_schema}.audit_batch_status c + where application_name='{app_name}' and batch_run_date='{current_date}' and object_execution_status='failed' + order by replication_order, source_table_name + """ + try: + with postgres_connection.cursor() as curs: + curs.execute(list_sql) + rows = curs.fetchall() + postgres_connection.commit() + postgres_cursor.close() + PgresPool.putconn(postgres_connection) + return rows + except Exception as e: + print(f"Error selecting record from cdc_master_table_list table: {str(e)}") + sys.exit(6) + return None + + + # In[12]: Initializing concurrency if __name__ == '__main__': # Main ETL process @@ -252,6 +276,15 @@ def load_data_from_src_tgt(table_name,source_schema,target_schema,customsql_ind, print(f"Error replicating {table_name}: {e}") audit_batch_status_insert(table_name[0],'failed') + get_failed_tables_rows = check_failed_tables(mstr_schema,app_name,current_date) + + if len(get_failed_tables_rows)>0: + print("Some of the tables are failed while replication for an application:",{app_name}) + sys.exit(7) + else: + print("All the tables have been successfully replicated for an application:",{app_name}) + + # record end time end = time.time() SrcPgresPool.closeall() From e266d1927c9848a799fa0f01f9ff4901c0bc932a Mon Sep 17 00:00:00 2001 From: vishreddy01 Date: Thu, 18 Apr 2024 12:30:03 -0700 Subject: [PATCH 12/12] changed the elect query for pg2pg replication --- shared/ods_replication_pg2pg/data_replication_pg2pg.py | 1 - 1 file changed, 1 deletion(-) diff --git a/shared/ods_replication_pg2pg/data_replication_pg2pg.py b/shared/ods_replication_pg2pg/data_replication_pg2pg.py index f315e39..4a8222d 100644 --- a/shared/ods_replication_pg2pg/data_replication_pg2pg.py +++ b/shared/ods_replication_pg2pg/data_replication_pg2pg.py @@ -229,7 +229,6 @@ def check_failed_tables(mstr_schema,app_name,current_date): SELECT object_name from {mstr_schema}.audit_batch_status c where application_name='{app_name}' and batch_run_date='{current_date}' and object_execution_status='failed' - order by replication_order, source_table_name """ try: with postgres_connection.cursor() as curs: