diff --git a/doc/operation-class.md b/doc/operation-class.md index 1d7c08d..e9522a7 100644 --- a/doc/operation-class.md +++ b/doc/operation-class.md @@ -27,7 +27,7 @@ class MessageRetentionOperation(BaseOperation): INSERT INTO {self.source_db}.{self.destination_table}({self.source_columns}) SELECT {self.source_columns} FROM {self.source_db}.{self.source_table} AS source - WHERE source. BETWEEN {start_pk} AND {end_pk} + WHERE source.{self.pk_column} BETWEEN {start_pk} AND {end_pk} AND source.ts > DATE_SUB(NOW(), INTERVAL 30 DAY) """ def _get_not_imported_pks_query(self, start_pk, end_pk): diff --git a/src/sbosc/controller/initializer.py b/src/sbosc/controller/initializer.py index dffbd3b..788051e 100644 --- a/src/sbosc/controller/initializer.py +++ b/src/sbosc/controller/initializer.py @@ -186,7 +186,9 @@ def fetch_metadata(self, redis_data): self.logger.info("Saved primary key column to Redis") # Get max id - cursor.execute("SELECT MAX(%s) FROM %s.%s" % (metadata.pk_column, metadata.source_db, metadata.source_table)) + cursor.execute(''' + SELECT MAX(%s) FROM %s.%s + ''' % (metadata.pk_column, metadata.source_db, metadata.source_table)) max_pk = cursor.fetchone()[0] if max_pk is None: raise Exception("No data in source table") diff --git a/src/sbosc/controller/validator.py b/src/sbosc/controller/validator.py index ca8cdd5..96b5f78 100644 --- a/src/sbosc/controller/validator.py +++ b/src/sbosc/controller/validator.py @@ -153,13 +153,15 @@ def __execute_apply_dml_events_validation_query( if event_pks: event_pks_str = ','.join([str(pk) for pk in event_pks]) dest_cursor.execute(f''' - SELECT {metadata.pk_column} FROM {metadata.destination_db}.{metadata.destination_table} WHERE {metadata.pk_column} IN ({event_pks_str}) + SELECT {metadata.pk_column} FROM {metadata.destination_db}.{metadata.destination_table} + WHERE {metadata.pk_column} IN ({event_pks_str}) ''') not_deleted_pks = set([row[0] for row in dest_cursor.fetchall()]) if dest_cursor.rowcount > 0: # Check if deleted pks are reinserted source_cursor.execute(f''' - SELECT {metadata.pk_column} FROM {metadata.source_db}.{metadata.source_table} WHERE {metadata.pk_column} IN ({event_pks_str}) + SELECT {metadata.pk_column} FROM {metadata.source_db}.{metadata.source_table} + WHERE {metadata.pk_column} IN ({event_pks_str}) ''') reinserted_pks = set([row[0] for row in source_cursor.fetchall()]) if reinserted_pks: diff --git a/src/sbosc/operations/base.py b/src/sbosc/operations/base.py index 66293b3..2eda1a6 100644 --- a/src/sbosc/operations/base.py +++ b/src/sbosc/operations/base.py @@ -41,7 +41,8 @@ def apply_update(self, db, updated_pks): def _get_not_imported_pks_query(self, start_pk, end_pk): return f''' SELECT source.{self.pk_column} FROM {self.source_db}.{self.source_table} AS source - LEFT JOIN {self.destination_db}.{self.destination_table} AS dest ON source.{self.pk_column} = dest.{self.pk_column} + LEFT JOIN {self.destination_db}.{self.destination_table} AS dest + ON source.{self.pk_column} = dest.{self.pk_column} WHERE source.{self.pk_column} BETWEEN {start_pk} AND {end_pk} AND dest.{self.pk_column} IS NULL '''