Skip to content

Commit

Permalink
Merge pull request #4 from bcgov/ods-replication-pg2pg
Browse files Browse the repository at this point in the history
PostgreSQL to PostgreSQL Replication Function
  • Loading branch information
abimichel authored Jul 12, 2024
2 parents a7aa725 + 6fc8b7c commit 92f17e9
Show file tree
Hide file tree
Showing 3 changed files with 187 additions and 11 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/docker-pg2pg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: Push to GHCR

on:
push:
branches: [ "ods-replication-pg2pg" ]
branches: ["main"]

env:
# DF-NOTE: pull ghcr.io/bcgov/nr-dap-ods-trino:main
Expand Down
9 changes: 2 additions & 7 deletions .github/workflows/docker-pipeline.yaml
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
name: Push to GHCR

on:
# schedule:
# - cron: '19 22 * * *'
push:
branches: [ "main" ]
pull_request:
branches: [ "main" ]
branches: [ "ods-replication-pg2pg" ]

env:
REGISTRY: ghcr.io
Expand All @@ -15,7 +11,6 @@ env:

jobs:
build:

runs-on: ubuntu-latest
permissions:
contents: read
Expand Down Expand Up @@ -65,4 +60,4 @@ jobs:
TAGS: ${{ steps.meta.outputs.tags }}
DIGEST: ${{ steps.build-and-push.outputs.digest }}

run: echo "${TAGS}" | xargs -I {} cosign sign --yes {}@${DIGEST}
run: echo "${TAGS}" | xargs -I {} cosign sign --yes {}@${DIGEST}
187 changes: 184 additions & 3 deletions shared/ods_replication_pg2pg/data_replication_pg2pg.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def get_active_tables(mstr_schema,app_name):
list_sql = f"""
SELECT application_name,source_schema_name,source_table_name,target_schema_name,target_table_name,truncate_flag,cdc_flag,full_inc_flag,cdc_column,replication_order,customsql_ind,customsql_query
from {mstr_schema}.cdc_master_table_list c
where active_ind = 'Y' and application_name='{app_name}'
where active_ind = 'Y' and application_name='{app_name}' AND replication_source = 'postgresql'
order by replication_order, source_table_name
"""
try:
Expand Down Expand Up @@ -242,7 +242,182 @@ def check_failed_tables(mstr_schema,app_name,current_date):
print(f"Error selecting record from cdc_master_table_list table: {str(e)}")
sys.exit(6)
return None


def get_table_ddl(tables_to_extract):
try:
for table in tables_to_extract:
if table_exists(table[3], table[2]) is False:
source_ddl = get_table_ddl_from_source(table)
#print(source_ddl)
create_table(source_ddl, table)
source_comments = get_table_comments_from_source(table)
#print(source_comments)
if source_comments is not None:
for row in source_comments:
create_table_comments(row[0], table)
print(f"The {table[3]}.{table[2]} table did not exist in the target database and has been created.")
else:
target_ddl = get_table_ddl_from_target(table)
source_ddl = get_table_ddl_from_source(table)
if source_ddl == target_ddl:
print(f"The {table[1]}.{table[0]} and {table[3]}.{table[2]} tables are identical.")
else:
drop_table(table)
create_table(source_ddl, table)
print(f"The {table[1]}.{table[0]} and {table[3]}.{table[2]} tables are different. The target table has been updated to match the source table.")
except Exception as e:
print(f"Error in get_table_ddl: {e}")
sys.exit(7)
return None

def get_table_ddl_from_source(table):
srcpostgres_connection = SrcPgresPool.getconn()
srcpostgres_cursor = srcpostgres_connection.cursor()
srcpostgres_cursor.execute("""
SELECT
'CREATE TABLE ' || quote_ident(%s::text) || '.' || quote_ident(t.relname) || ' (' ||
string_agg(
quote_ident(c.column_name) || ' ' ||
CASE
WHEN c.data_type = 'character varying' THEN c.data_type || '(' || c.character_maximum_length|| ')'
WHEN c.data_type = 'numeric' THEN c.data_type || '(' || c.numeric_precision || ',' || c.numeric_scale || ')'
ELSE c.data_type
END ||
CASE
WHEN c.column_default IS NOT NULL THEN ' DEFAULT ' || c.column_default
ELSE ''
END ||
CASE
WHEN c.is_nullable = 'NO' THEN ' NOT NULL'
ELSE ''
END,
', '
ORDER BY c.ordinal_position
) ||
');' AS ddl
FROM
pg_class t
JOIN
information_schema.columns c ON t.relname = c.table_name AND t.relnamespace = c.table_schema::regnamespace
WHERE
t.relkind = 'r'
AND t.relnamespace = %s::regnamespace
AND t.relname = %s
GROUP BY
t.relnamespace, t.relname;
""", (table[3],table[1], table[0]))
ddl = srcpostgres_cursor.fetchone()[0]
srcpostgres_cursor.close()
SrcPgresPool.putconn(srcpostgres_connection)
return ddl

def get_table_comments_from_source(table):
srcpostgres_connection_c = SrcPgresPool.getconn()
srcpostgres_cursor_c = srcpostgres_connection_c.cursor()
srcpostgres_cursor_c.execute("""
select
'COMMENT ON COLUMN ' || %s||'.'||c.table_name||'.'||c.column_name||' IS '''||pgd.description||''';' as comments
from pg_catalog.pg_statio_all_tables as st
inner join pg_catalog.pg_description pgd on (
pgd.objoid = st.relid
)
inner join information_schema.columns c on (
pgd.objsubid = c.ordinal_position and
c.table_schema = st.schemaname and
c.table_name = st.relname
)
where c.table_schema=%s and c.table_name=%s;
""", (table[3],table[1], table[0]))
comments = srcpostgres_cursor_c.fetchall()
print(comments)
srcpostgres_cursor_c.close()
SrcPgresPool.putconn(srcpostgres_connection_c)
return comments

def get_table_ddl_from_target(table):
postgres_connection = PgresPool.getconn()
postgres_cursor = postgres_connection.cursor()
postgres_cursor.execute("""
SELECT
'CREATE TABLE ' || quote_ident(t.relnamespace::regnamespace::text) || '.' || quote_ident(t.relname) || ' (' ||
string_agg(
quote_ident(c.column_name) || ' ' ||
CASE
WHEN c.data_type = 'character varying' THEN c.data_type || '(' || c.character_maximum_length || ')'
WHEN c.data_type = 'numeric' THEN c.data_type || '(' || c.numeric_precision || ',' || c.numeric_scale || ')'
ELSE c.data_type
END ||
CASE
WHEN c.column_default IS NOT NULL THEN ' DEFAULT ' || c.column_default
ELSE ''
END ||
CASE
WHEN c.is_nullable = 'NO' THEN ' NOT NULL'
ELSE ''
END,
', '
ORDER BY c.ordinal_position
) ||
');' AS ddl
FROM
pg_class t
JOIN
information_schema.columns c ON t.relname = c.table_name AND t.relnamespace = c.table_schema::regnamespace
WHERE
t.relkind = 'r'
AND t.relnamespace = %s::regnamespace
AND t.relname = %s
GROUP BY
t.relnamespace, t.relname;
""", (table[3], table[2]))
ddl = postgres_cursor.fetchone()[0]
postgres_cursor.close()
PgresPool.putconn(postgres_connection)
return ddl

def table_exists(table_schema, table_name):
postgres_connection = PgresPool.getconn()
postgres_cursor = postgres_connection.cursor()
postgres_cursor.execute("SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = %s and table_name = %s)", (table_schema, table_name))
result = postgres_cursor.fetchone()[0]
postgres_cursor.close()
PgresPool.putconn(postgres_connection)
return result

def drop_table(table):
postgres_connection = PgresPool.getconn()
postgres_cursor = postgres_connection.cursor()
try:
with postgres_connection.cursor() as curs:
#postgres_cursor.execute(f"drop table {table[1]}.{table[0]} cascade;")
curs.execute(f"drop table {table[3]}.{table[2]} cascade;")
postgres_connection.commit()
postgres_cursor.close()
PgresPool.putconn(postgres_connection)
except Exception as e:
print("Error dropping table: ", str(e))
sys.exit(8)
return None

def create_table(ddl, table):
postgres_connection = PgresPool.getconn()
postgres_cursor = postgres_connection.cursor()
with postgres_connection.cursor() as curs:
#postgres_cursor.execute(f"drop table {table[1]}.{table[0]} cascade;")
curs.execute(ddl)
postgres_connection.commit()
postgres_cursor.close()
PgresPool.putconn(postgres_connection)

def create_table_comments(row, table):
postgres_connection = PgresPool.getconn()
postgres_cursor = postgres_connection.cursor()
with postgres_connection.cursor() as curs:
#postgres_cursor.execute(f"drop table {table[1]}.{table[0]} cascade;")
curs.execute(row)
postgres_connection.commit()
postgres_cursor.close()
PgresPool.putconn(postgres_connection)


# In[12]: Initializing concurrency
Expand All @@ -256,6 +431,12 @@ def check_failed_tables(mstr_schema,app_name,current_date):
print(f'No of concurrent tasks:{concurrent_tasks}')
#Delete audit entries for rerun on same day
del_audit_entries_rerun(current_date)

#Verify and recreate the DDL
ddls_to_extract = [(row[2],row[1],row[4],row[3]) for row in active_tables_rows]
#print(ddls_to_extract)
get_table_ddl(ddls_to_extract)

# Using ThreadPoolExecutor to run tasks concurrently
with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_tasks) as executor:
# Submit tasks to the executor
Expand All @@ -279,7 +460,7 @@ def check_failed_tables(mstr_schema,app_name,current_date):

if len(get_failed_tables_rows)>0:
print("Some of the tables are failed while replication for an application:",{app_name})
sys.exit(7)
sys.exit(9)
else:
print("All the tables have been successfully replicated for an application:",{app_name})

Expand Down

0 comments on commit 92f17e9

Please sign in to comment.