From f84e6a792a74946bf192282fb2836cf8bd416257 Mon Sep 17 00:00:00 2001 From: Jimmy Kim Date: Wed, 26 Jun 2024 10:22:51 +0900 Subject: [PATCH] Handle IntCastingNaNError on rematched_updated_pks --- src/sbosc/operations/base.py | 39 +++++++++++++++++++----------------- 1 file changed, 21 insertions(+), 18 deletions(-) diff --git a/src/sbosc/operations/base.py b/src/sbosc/operations/base.py index e296ea8..98d1100 100644 --- a/src/sbosc/operations/base.py +++ b/src/sbosc/operations/base.py @@ -230,24 +230,27 @@ def get_not_updated_pks(self, source_cursor, dest_cursor, start_timestamp, end_t def get_rematched_updated_pks(self, db, not_updated_pks): not_updated_pks_str = ','.join([str(pk) for pk in not_updated_pks]) - with db.cursor(host='source', role='reader') as cursor: - cursor: Cursor - cursor.execute(f''' - SELECT {self.source_columns} FROM {self.source_db}.{self.source_table} - WHERE id IN ({not_updated_pks_str}) - ''') - source_df = pd.DataFrame(cursor.fetchall(), columns=[c[0] for c in cursor.description]) - with db.cursor(host='dest', role='reader') as cursor: - cursor: Cursor - cursor.execute(f''' - SELECT {self.source_columns} FROM {self.destination_db}.{self.destination_table} - WHERE id IN ({not_updated_pks_str}) - ''') - dest_df = pd.DataFrame(cursor.fetchall(), columns=[c[0] for c in cursor.description]) - - dest_df = dest_df.astype(source_df.dtypes.to_dict()) - merged_df = source_df.merge(dest_df, how='inner', on=source_df.columns.tolist(), indicator=True) - rematched_pks = set(merged_df[merged_df['_merge'] == 'both']['id'].tolist()) + # Get rematched_pks + try: + with db.cursor(host='source', role='reader') as cursor: + cursor: Cursor + cursor.execute(f''' + SELECT {self.source_columns} FROM {self.source_db}.{self.source_table} + WHERE id IN ({not_updated_pks_str}) + ''') + source_df = pd.DataFrame(cursor.fetchall(), columns=[c[0] for c in cursor.description]) + with db.cursor(host='dest', role='reader') as cursor: + cursor: Cursor + cursor.execute(f''' + SELECT {self.source_columns} FROM {self.destination_db}.{self.destination_table} + WHERE id IN ({not_updated_pks_str}) + ''') + dest_df = pd.DataFrame(cursor.fetchall(), columns=[c[0] for c in cursor.description]) + dest_df = dest_df.astype(source_df.dtypes.to_dict()) + merged_df = source_df.merge(dest_df, how='inner', on=source_df.columns.tolist(), indicator=True) + rematched_pks = set(merged_df[merged_df['_merge'] == 'both']['id'].tolist()) + except pd.errors.IntCastingNaNError: + rematched_pks = set() # add deleted pks with db.cursor(host='source', role='reader') as cursor: cursor.execute(f'''