-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #1 from bcgov/ods-replication-pg2pg
Ods-replication-pg2pg
- Loading branch information
Showing
4 changed files
with
389 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
name: Push to GHCR | ||
|
||
on: | ||
push: | ||
branches: [ "ods-replication-pg2pg" ] | ||
|
||
env: | ||
# DF-NOTE: pull ghcr.io/bcgov/nr-dap-ods-trino:main | ||
REGISTRY: ghcr.io | ||
DOCKERFILE_PATH: shared/ods_replication_pg2pg | ||
IMAGE_NAME: ${{ github.repository }}-pg2pg | ||
|
||
jobs: | ||
build: | ||
runs-on: ubuntu-latest | ||
permissions: | ||
contents: read | ||
packages: write | ||
id-token: write | ||
|
||
steps: | ||
- name: Checkout repository | ||
uses: actions/checkout@v3 | ||
|
||
- name: Install cosign | ||
if: github.event_name != 'pull_request' | ||
uses: sigstore/cosign-installer@v3.3.0 | ||
|
||
- name: Set up Docker Buildx | ||
uses: docker/setup-buildx-action@f95db51fddba0c2d1ec667646a06c2ce06100226 # v3.0.0 | ||
|
||
- name: Log into registry ${{ env.REGISTRY }} | ||
if: github.event_name != 'pull_request' | ||
uses: docker/login-action@343f7c4344506bcbf9b4de18042ae17996df046d # v3.0.0 | ||
with: | ||
registry: ${{ env.REGISTRY }} | ||
username: ${{ github.actor }} | ||
password: ${{ secrets.GITHUB_TOKEN }} | ||
|
||
- name: Extract Docker metadata | ||
id: meta | ||
uses: docker/metadata-action@96383f45573cb7f253c731d3b3ab81c87ef81934 # v5.0.0 | ||
with: | ||
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} | ||
|
||
- name: Build and push Docker image | ||
id: build-and-push | ||
uses: docker/build-push-action@0565240e2d4ab88bba5387d719585280857ece09 # v5.0.0 | ||
with: | ||
# DF-NOTE: to help the action find the Dockerfile to build from | ||
context: ${{ env.DOCKERFILE_PATH }}/ | ||
push: ${{ github.event_name != 'pull_request' }} | ||
tags: ${{ steps.meta.outputs.tags }} | ||
labels: ${{ steps.meta.outputs.labels }} | ||
cache-from: type=gha | ||
cache-to: type=gha,mode=max | ||
|
||
- name: Sign the published Docker image | ||
if: ${{ github.event_name != 'pull_request' }} | ||
env: | ||
# https://docs.github.com/en/actions/security-guides/security-hardening-for-github-actions#using-an-intermediate-environment-variable | ||
TAGS: ${{ steps.meta.outputs.tags }} | ||
DIGEST: ${{ steps.build-and-push.outputs.digest }} | ||
|
||
run: echo "${TAGS}" | xargs -I {} cosign sign --yes {}@${DIGEST} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
FROM python:3.11.4-slim-buster | ||
|
||
WORKDIR /app | ||
|
||
# PostgreSQL library | ||
|
||
RUN apt-get update \ | ||
&& apt-get -y install libpq-dev gcc \ | ||
&& pip install psycopg2 | ||
|
||
ADD *.py . | ||
|
||
COPY requirements.txt requirements.txt | ||
|
||
RUN pip3 install -r requirements.txt | ||
|
||
CMD ["python3", "./data_replication_pg2pg.py"] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,295 @@ | ||
#!/usr/bin/env python | ||
# coding: utf-8 | ||
|
||
# In[1]: Imports | ||
# refer if block at line 38, some imports are conditional | ||
import psycopg2 | ||
import psycopg2.pool | ||
import psycopg2.extras | ||
from psycopg2.extras import execute_batch | ||
import configparser | ||
import time | ||
import json | ||
import concurrent.futures | ||
from datetime import datetime | ||
import sys | ||
import os | ||
import argparse | ||
|
||
|
||
start = time.time() | ||
|
||
# In[3]: Retrieve Oracle database configuration | ||
src_postgres_username = os.environ['DB_USERNAME'] | ||
src_postgres_password = os.environ['DB_PASSWORD'] | ||
src_postgres_host = os.environ['DB_HOST'] | ||
src_postgres_port = os.environ['DB_PORT'] | ||
src_postgres_database = os.environ['DATABASE'] | ||
# In[4]: Retrieve Postgres database configuration | ||
postgres_username = os.environ['ODS_USERNAME'] | ||
postgres_password = os.environ['ODS_PASSWORD'] | ||
postgres_host = os.environ['ODS_HOST'] | ||
postgres_port = os.environ['ODS_PORT'] | ||
postgres_database = os.environ['ODS_DATABASE'] | ||
# In[5]: Script parameters | ||
mstr_schema = os.environ['MSTR_SCHEMA'] | ||
app_name = os.environ['APP_NAME'] | ||
concurrent_tasks = int(os.environ['CONCUR_TASKS']) | ||
audit_table = 'audit_batch_status' | ||
current_date = datetime.now().strftime('%Y-%m-%d') | ||
|
||
#concurrent_tasks = int(concurrent_tasks) | ||
#In[5]: Concurrent tasks - number of tables to be replicated in parallel | ||
#concurrent_tasks = 5 | ||
|
||
# In[6]: Set up Oracle connection pool | ||
# In[7]: Setup Postgres Pool | ||
|
||
try: | ||
SrcPgresPool = psycopg2.pool.ThreadedConnectionPool( | ||
minconn=concurrent_tasks, | ||
maxconn=concurrent_tasks, | ||
host=src_postgres_host, | ||
port=src_postgres_port, | ||
dbname=src_postgres_database, | ||
user=src_postgres_username, | ||
password=src_postgres_password | ||
) | ||
print('Source Postgres Connection Successful') | ||
# Exit with code 0 (success) if connection successful | ||
except Exception as e: | ||
print(f'Error connecting to Source PostgreSQL: {e}') | ||
# Exit with code 1 (failure) if connection unsuccessful | ||
sys.exit(1) | ||
|
||
|
||
# In[7]: Setup Postgres Pool | ||
try: | ||
PgresPool = psycopg2.pool.ThreadedConnectionPool( | ||
minconn=concurrent_tasks, | ||
maxconn=concurrent_tasks, | ||
host=postgres_host, | ||
port=postgres_port, | ||
dbname=postgres_database, | ||
user=postgres_username, | ||
password=postgres_password | ||
) | ||
print('Target Postgres Connection Successful') | ||
|
||
except psycopg2.OperationalError as e: | ||
print(f'Error connecting to Target PostgreSQL: {e}') | ||
# Exit with code 1 (failure) if connection unsuccessful | ||
sys.exit(2) | ||
|
||
def del_audit_entries_rerun(current_date): | ||
postgres_connection = PgresPool.getconn() | ||
postgres_cursor = postgres_connection.cursor() | ||
try: | ||
del_sql = f""" | ||
DELETE FROM {mstr_schema}.{audit_table} c | ||
where application_name='{app_name}' and batch_run_date='{current_date}' | ||
""" | ||
postgres_cursor.execute(del_sql) | ||
postgres_connection.commit() | ||
postgres_cursor.close() | ||
PgresPool.putconn(postgres_connection) | ||
print(del_sql) | ||
return None | ||
except Exception as e: | ||
print(f"Failed to delete audit entries for application name {app_name} and date '{current_date}': {str(e)}") | ||
sys.exit(3) | ||
|
||
# Function to insert the audit batch status entry | ||
def audit_batch_status_insert(table_name,status): | ||
postgres_connection = PgresPool.getconn() | ||
postgres_cursor = postgres_connection.cursor() | ||
try: | ||
audit_batch_status_query = f"""INSERT INTO {mstr_schema}.{audit_table} VALUES ('{table_name}','{app_name}','replication','{status}',current_date)""" | ||
print(audit_batch_status_query) | ||
postgres_cursor.execute(audit_batch_status_query) | ||
postgres_connection.commit() | ||
print(f"Record inserted into audit batch status table") | ||
return None | ||
except Exception as e: | ||
print(f"Error inserting record into to audit batch status table: {str(e)}") | ||
sys.exit(4) | ||
return None | ||
finally: | ||
# Return the connection to the pool | ||
if postgres_connection: | ||
postgres_cursor.close() | ||
PgresPool.putconn(postgres_connection) | ||
|
||
# In[8]: Function to get active rows from master table | ||
def get_active_tables(mstr_schema,app_name): | ||
postgres_connection = PgresPool.getconn() | ||
postgres_cursor = postgres_connection.cursor() | ||
list_sql = f""" | ||
SELECT application_name,source_schema_name,source_table_name,target_schema_name,target_table_name,truncate_flag,cdc_flag,full_inc_flag,cdc_column,replication_order,customsql_ind,customsql_query | ||
from {mstr_schema}.cdc_master_table_list c | ||
where active_ind = 'Y' and application_name='{app_name}' | ||
order by replication_order, source_table_name | ||
""" | ||
try: | ||
with postgres_connection.cursor() as curs: | ||
curs.execute(list_sql) | ||
rows = curs.fetchall() | ||
postgres_connection.commit() | ||
postgres_cursor.close() | ||
PgresPool.putconn(postgres_connection) | ||
return rows | ||
except Exception as e: | ||
print(f"Error selecting record from cdc_master_table_list table: {str(e)}") | ||
sys.exit(5) | ||
return None | ||
|
||
|
||
# In[9]: Function to extract data from Oracle | ||
def extract_from_srcpg(table_name,source_schema,customsql_ind,customsql_query): | ||
# Acquire a connection from the pool | ||
srcpostgres_connection = SrcPgresPool.getconn() | ||
srcpostgres_cursor = srcpostgres_connection.cursor() | ||
try: | ||
if customsql_ind == "Y": | ||
# Use placeholders in the query and bind the table name as a parameter | ||
sql_query=customsql_query | ||
print(sql_query) | ||
srcpostgres_cursor.execute(sql_query) | ||
rows = srcpostgres_cursor.fetchall() | ||
#OrcPool.release(oracle_connection) | ||
return rows | ||
else: | ||
sql_query = f'SELECT * FROM {source_schema}.{table_name}' | ||
print(sql_query) | ||
srcpostgres_cursor.execute(sql_query) | ||
rows = srcpostgres_cursor.fetchall() | ||
#OrcPool.release(oracle_connection) | ||
return rows | ||
|
||
except Exception as e: | ||
audit_batch_status_insert(table_name,'failed') | ||
print(f"Error extracting data from SrcPostgres: {str(e)}") | ||
#OrcPool.release(oracle_connection) #Temporary change | ||
return [] | ||
|
||
finally: | ||
# Return the connection to the pool | ||
if srcpostgres_connection: | ||
srcpostgres_cursor.close() | ||
SrcPgresPool.putconn(srcpostgres_connection) | ||
# In[10]: Function to load data into Target PostgreSQL using data from Source Oracle | ||
def load_into_postgres(table_name, data,target_schema): | ||
postgres_connection = PgresPool.getconn() | ||
postgres_cursor = postgres_connection.cursor() | ||
try: | ||
# Delete existing data in the target table | ||
delete_query = f'TRUNCATE TABLE {target_schema}.{table_name}' | ||
postgres_cursor.execute(delete_query) | ||
|
||
# Build the INSERT query with placeholders | ||
insert_query = f'INSERT INTO {target_schema}.{table_name} VALUES ({", ".join(["%s"] * len(data[0]))})' | ||
#insert_query = f'INSERT INTO {target_schema}.{table_name} VALUES %s' | ||
|
||
# Use execute_batch for efficient batch insert | ||
with postgres_connection.cursor() as cursor: | ||
# Prepare the data as a list of tuples | ||
data_to_insert = [(tuple(row)) for row in data] | ||
execute_batch(cursor, insert_query, data_to_insert) | ||
postgres_connection.commit() | ||
# Insert record to audit batch table | ||
audit_batch_status_insert(table_name,'success') | ||
|
||
|
||
except Exception as e: | ||
print(f"Error loading data into PostgreSQL: {str(e)}") | ||
audit_batch_status_insert(table_name,'failed') | ||
finally: | ||
# Return the connection to the pool | ||
if postgres_connection: | ||
postgres_cursor.close() | ||
PgresPool.putconn(postgres_connection) | ||
|
||
# In[11]: Function to call both extract and load functions | ||
def load_data_from_src_tgt(table_name,source_schema,target_schema,customsql_ind,customsql_query): | ||
# Extract data from Oracle | ||
print(f'Source: Thread {table_name} started at ' + datetime.now().strftime("%H:%M:%S")) | ||
srcpg_data = extract_from_srcpg(table_name,source_schema,customsql_ind,customsql_query) # Ensure table name is in uppercase | ||
print(f'Source: Extraction for {table_name} completed at ' + datetime.now().strftime("%H:%M:%S")) | ||
|
||
if srcpg_data: | ||
# Load data into PostgreSQL | ||
load_into_postgres(table_name, srcpg_data, target_schema) | ||
print(f"Target: Data loaded into table: {table_name}") | ||
print(f'Target: Thread {table_name} ended at ' + datetime.now().strftime("%H:%M:%S")) | ||
|
||
def check_failed_tables(mstr_schema,app_name,current_date): | ||
postgres_connection = PgresPool.getconn() | ||
postgres_cursor = postgres_connection.cursor() | ||
list_sql = f""" | ||
SELECT object_name | ||
from {mstr_schema}.audit_batch_status c | ||
where application_name='{app_name}' and batch_run_date='{current_date}' and object_execution_status='failed' | ||
""" | ||
try: | ||
with postgres_connection.cursor() as curs: | ||
curs.execute(list_sql) | ||
rows = curs.fetchall() | ||
postgres_connection.commit() | ||
postgres_cursor.close() | ||
PgresPool.putconn(postgres_connection) | ||
return rows | ||
except Exception as e: | ||
print(f"Error selecting record from cdc_master_table_list table: {str(e)}") | ||
sys.exit(6) | ||
return None | ||
|
||
|
||
|
||
# In[12]: Initializing concurrency | ||
if __name__ == '__main__': | ||
# Main ETL process | ||
active_tables_rows =get_active_tables(mstr_schema,app_name) | ||
#print(active_tables_rows) | ||
tables_to_extract = [(row[2],row[1],row[3],row[10],row[11]) for row in active_tables_rows] | ||
|
||
print(f"tables to extract are {tables_to_extract}") | ||
print(f'No of concurrent tasks:{concurrent_tasks}') | ||
#Delete audit entries for rerun on same day | ||
del_audit_entries_rerun(current_date) | ||
# Using ThreadPoolExecutor to run tasks concurrently | ||
with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_tasks) as executor: | ||
# Submit tasks to the executor | ||
future_to_table = {executor.submit(load_data_from_src_tgt, table[0],table[1],table[2],table[3],table[4]): table for table in tables_to_extract} | ||
|
||
# Wait for all tasks to complete | ||
concurrent.futures.wait(future_to_table) | ||
|
||
# Print results | ||
for future in future_to_table: | ||
table_name = future_to_table[future] | ||
try: | ||
# Get the result of the task, if any | ||
future.result() | ||
except Exception as e: | ||
# Handle exceptions that occurred during the task | ||
print(f"Error replicating {table_name}: {e}") | ||
audit_batch_status_insert(table_name[0],'failed') | ||
|
||
get_failed_tables_rows = check_failed_tables(mstr_schema,app_name,current_date) | ||
|
||
if len(get_failed_tables_rows)>0: | ||
print("Some of the tables are failed while replication for an application:",{app_name}) | ||
sys.exit(7) | ||
else: | ||
print("All the tables have been successfully replicated for an application:",{app_name}) | ||
|
||
|
||
# record end time | ||
end = time.time() | ||
SrcPgresPool.closeall() | ||
PgresPool.closeall() | ||
|
||
print("ETL process completed successfully.") | ||
print("The time of execution of the program is:", (end - start) , "secs") | ||
|
||
sys.exit(0) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
oracledb==1.3.1 | ||
psycopg2==2.9.6 | ||
pandas==2.0.2 | ||
openpyxl==3.1.2 | ||
configparser==6.0.0 | ||
dbt-postgres==1.6.4 | ||
PyYAML==6.0 | ||
pyodbc==4.0.39 | ||
python-dotenv==1.0.0 | ||
boto3==1.28.10 | ||
requests==2.31.0 | ||
XlsxWriter==3.1.2 |