Skip to content

Commit

Permalink
change the exit codes for the pg2pg script
Browse files Browse the repository at this point in the history
  • Loading branch information
vishreddy01 committed Apr 18, 2024
1 parent d04efa5 commit 5ef9afd
Showing 1 changed file with 33 additions and 0 deletions.
33 changes: 33 additions & 0 deletions shared/ods_replication_pg2pg/data_replication_pg2pg.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,30 @@ def load_data_from_src_tgt(table_name,source_schema,target_schema,customsql_ind,
print(f"Target: Data loaded into table: {table_name}")
print(f'Target: Thread {table_name} ended at ' + datetime.now().strftime("%H:%M:%S"))

def check_failed_tables(mstr_schema,app_name,current_date):
postgres_connection = PgresPool.getconn()
postgres_cursor = postgres_connection.cursor()
list_sql = f"""
SELECT object_name
from {mstr_schema}.audit_batch_status c
where application_name='{app_name}' and batch_run_date='{current_date}' and object_execution_status='failed'
order by replication_order, source_table_name
"""
try:
with postgres_connection.cursor() as curs:
curs.execute(list_sql)
rows = curs.fetchall()
postgres_connection.commit()
postgres_cursor.close()
PgresPool.putconn(postgres_connection)
return rows
except Exception as e:
print(f"Error selecting record from cdc_master_table_list table: {str(e)}")
sys.exit(6)
return None



# In[12]: Initializing concurrency
if __name__ == '__main__':
# Main ETL process
Expand Down Expand Up @@ -252,6 +276,15 @@ def load_data_from_src_tgt(table_name,source_schema,target_schema,customsql_ind,
print(f"Error replicating {table_name}: {e}")
audit_batch_status_insert(table_name[0],'failed')

get_failed_tables_rows = check_failed_tables(mstr_schema,app_name,current_date)

if len(get_failed_tables_rows)>0:
print("Some of the tables are failed while replication for an application:",{app_name})
sys.exit(7)
else:
print("All the tables have been successfully replicated for an application:",{app_name})


# record end time
end = time.time()
SrcPgresPool.closeall()
Expand Down

0 comments on commit 5ef9afd

Please sign in to comment.