diff --git a/shared/ods_replication_pg2pg/data_replication_pg2pg.py b/shared/ods_replication_pg2pg/data_replication_pg2pg.py index 4a8222d..b998396 100644 --- a/shared/ods_replication_pg2pg/data_replication_pg2pg.py +++ b/shared/ods_replication_pg2pg/data_replication_pg2pg.py @@ -127,7 +127,7 @@ def get_active_tables(mstr_schema,app_name): list_sql = f""" SELECT application_name,source_schema_name,source_table_name,target_schema_name,target_table_name,truncate_flag,cdc_flag,full_inc_flag,cdc_column,replication_order,customsql_ind,customsql_query from {mstr_schema}.cdc_master_table_list c - where active_ind = 'Y' and application_name='{app_name}' + where active_ind = 'Y' and application_name='{app_name}' AND replication_source = 'postgresql' order by replication_order, source_table_name """ try: @@ -242,7 +242,182 @@ def check_failed_tables(mstr_schema,app_name,current_date): print(f"Error selecting record from cdc_master_table_list table: {str(e)}") sys.exit(6) return None - + +def get_table_ddl(tables_to_extract): + try: + for table in tables_to_extract: + if table_exists(table[3], table[2]) is False: + source_ddl = get_table_ddl_from_source(table) + #print(source_ddl) + create_table(source_ddl, table) + source_comments = get_table_comments_from_source(table) + #print(source_comments) + if source_comments is not None: + for row in source_comments: + create_table_comments(row[0], table) + print(f"The {table[3]}.{table[2]} table did not exist in the target database and has been created.") + else: + target_ddl = get_table_ddl_from_target(table) + source_ddl = get_table_ddl_from_source(table) + if source_ddl == target_ddl: + print(f"The {table[1]}.{table[0]} and {table[3]}.{table[2]} tables are identical.") + else: + drop_table(table) + create_table(source_ddl, table) + print(f"The {table[1]}.{table[0]} and {table[3]}.{table[2]} tables are different. The target table has been updated to match the source table.") + except Exception as e: + print(f"Error in get_table_ddl: {e}") + sys.exit(7) + return None + +def get_table_ddl_from_source(table): + srcpostgres_connection = SrcPgresPool.getconn() + srcpostgres_cursor = srcpostgres_connection.cursor() + srcpostgres_cursor.execute(""" + SELECT + 'CREATE TABLE ' || quote_ident(%s::text) || '.' || quote_ident(t.relname) || ' (' || + string_agg( + quote_ident(c.column_name) || ' ' || + CASE + WHEN c.data_type = 'character varying' THEN c.data_type || '(' || c.character_maximum_length|| ')' + WHEN c.data_type = 'numeric' THEN c.data_type || '(' || c.numeric_precision || ',' || c.numeric_scale || ')' + ELSE c.data_type + END || + CASE + WHEN c.column_default IS NOT NULL THEN ' DEFAULT ' || c.column_default + ELSE '' + END || + CASE + WHEN c.is_nullable = 'NO' THEN ' NOT NULL' + ELSE '' + END, + ', ' + ORDER BY c.ordinal_position + ) || + ');' AS ddl + FROM + pg_class t + JOIN + information_schema.columns c ON t.relname = c.table_name AND t.relnamespace = c.table_schema::regnamespace + WHERE + t.relkind = 'r' + AND t.relnamespace = %s::regnamespace + AND t.relname = %s + GROUP BY + t.relnamespace, t.relname; + """, (table[3],table[1], table[0])) + ddl = srcpostgres_cursor.fetchone()[0] + srcpostgres_cursor.close() + SrcPgresPool.putconn(srcpostgres_connection) + return ddl + +def get_table_comments_from_source(table): + srcpostgres_connection_c = SrcPgresPool.getconn() + srcpostgres_cursor_c = srcpostgres_connection_c.cursor() + srcpostgres_cursor_c.execute(""" + select + 'COMMENT ON COLUMN ' || %s||'.'||c.table_name||'.'||c.column_name||' IS '''||pgd.description||''';' as comments + from pg_catalog.pg_statio_all_tables as st + inner join pg_catalog.pg_description pgd on ( + pgd.objoid = st.relid + ) + inner join information_schema.columns c on ( + pgd.objsubid = c.ordinal_position and + c.table_schema = st.schemaname and + c.table_name = st.relname + ) + where c.table_schema=%s and c.table_name=%s; + """, (table[3],table[1], table[0])) + comments = srcpostgres_cursor_c.fetchall() + print(comments) + srcpostgres_cursor_c.close() + SrcPgresPool.putconn(srcpostgres_connection_c) + return comments + +def get_table_ddl_from_target(table): + postgres_connection = PgresPool.getconn() + postgres_cursor = postgres_connection.cursor() + postgres_cursor.execute(""" + SELECT + 'CREATE TABLE ' || quote_ident(t.relnamespace::regnamespace::text) || '.' || quote_ident(t.relname) || ' (' || + string_agg( + quote_ident(c.column_name) || ' ' || + CASE + WHEN c.data_type = 'character varying' THEN c.data_type || '(' || c.character_maximum_length || ')' + WHEN c.data_type = 'numeric' THEN c.data_type || '(' || c.numeric_precision || ',' || c.numeric_scale || ')' + ELSE c.data_type + END || + CASE + WHEN c.column_default IS NOT NULL THEN ' DEFAULT ' || c.column_default + ELSE '' + END || + CASE + WHEN c.is_nullable = 'NO' THEN ' NOT NULL' + ELSE '' + END, + ', ' + ORDER BY c.ordinal_position + ) || + ');' AS ddl + FROM + pg_class t + JOIN + information_schema.columns c ON t.relname = c.table_name AND t.relnamespace = c.table_schema::regnamespace + WHERE + t.relkind = 'r' + AND t.relnamespace = %s::regnamespace + AND t.relname = %s + GROUP BY + t.relnamespace, t.relname; + """, (table[3], table[2])) + ddl = postgres_cursor.fetchone()[0] + postgres_cursor.close() + PgresPool.putconn(postgres_connection) + return ddl + +def table_exists(table_schema, table_name): + postgres_connection = PgresPool.getconn() + postgres_cursor = postgres_connection.cursor() + postgres_cursor.execute("SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = %s and table_name = %s)", (table_schema, table_name)) + result = postgres_cursor.fetchone()[0] + postgres_cursor.close() + PgresPool.putconn(postgres_connection) + return result + +def drop_table(table): + postgres_connection = PgresPool.getconn() + postgres_cursor = postgres_connection.cursor() + try: + with postgres_connection.cursor() as curs: + #postgres_cursor.execute(f"drop table {table[1]}.{table[0]} cascade;") + curs.execute(f"drop table {table[3]}.{table[2]} cascade;") + postgres_connection.commit() + postgres_cursor.close() + PgresPool.putconn(postgres_connection) + except Exception as e: + print("Error dropping table: ", str(e)) + sys.exit(8) + return None + +def create_table(ddl, table): + postgres_connection = PgresPool.getconn() + postgres_cursor = postgres_connection.cursor() + with postgres_connection.cursor() as curs: + #postgres_cursor.execute(f"drop table {table[1]}.{table[0]} cascade;") + curs.execute(ddl) + postgres_connection.commit() + postgres_cursor.close() + PgresPool.putconn(postgres_connection) + +def create_table_comments(row, table): + postgres_connection = PgresPool.getconn() + postgres_cursor = postgres_connection.cursor() + with postgres_connection.cursor() as curs: + #postgres_cursor.execute(f"drop table {table[1]}.{table[0]} cascade;") + curs.execute(row) + postgres_connection.commit() + postgres_cursor.close() + PgresPool.putconn(postgres_connection) # In[12]: Initializing concurrency @@ -256,6 +431,12 @@ def check_failed_tables(mstr_schema,app_name,current_date): print(f'No of concurrent tasks:{concurrent_tasks}') #Delete audit entries for rerun on same day del_audit_entries_rerun(current_date) + + #Verify and recreate the DDL + ddls_to_extract = [(row[2],row[1],row[4],row[3]) for row in active_tables_rows] + #print(ddls_to_extract) + get_table_ddl(ddls_to_extract) + # Using ThreadPoolExecutor to run tasks concurrently with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_tasks) as executor: # Submit tasks to the executor @@ -279,7 +460,7 @@ def check_failed_tables(mstr_schema,app_name,current_date): if len(get_failed_tables_rows)>0: print("Some of the tables are failed while replication for an application:",{app_name}) - sys.exit(7) + sys.exit(9) else: print("All the tables have been successfully replicated for an application:",{app_name})