Skip to content

Commit

Permalink
lint fix
Browse files Browse the repository at this point in the history
  • Loading branch information
jjh-kim committed Nov 27, 2024
1 parent 72b0b1b commit 57b67c7
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 5 deletions.
2 changes: 1 addition & 1 deletion doc/operation-class.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class MessageRetentionOperation(BaseOperation):
INSERT INTO {self.source_db}.{self.destination_table}({self.source_columns})
SELECT {self.source_columns}
FROM {self.source_db}.{self.source_table} AS source
WHERE source. BETWEEN {start_pk} AND {end_pk}
WHERE source.{self.pk_column} BETWEEN {start_pk} AND {end_pk}
AND source.ts > DATE_SUB(NOW(), INTERVAL 30 DAY)
"""
def _get_not_imported_pks_query(self, start_pk, end_pk):
Expand Down
4 changes: 3 additions & 1 deletion src/sbosc/controller/initializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,9 @@ def fetch_metadata(self, redis_data):
self.logger.info("Saved primary key column to Redis")

# Get max id
cursor.execute("SELECT MAX(%s) FROM %s.%s" % (metadata.pk_column, metadata.source_db, metadata.source_table))
cursor.execute('''
SELECT MAX(%s) FROM %s.%s
''' % (metadata.pk_column, metadata.source_db, metadata.source_table))
max_pk = cursor.fetchone()[0]
if max_pk is None:
raise Exception("No data in source table")
Expand Down
6 changes: 4 additions & 2 deletions src/sbosc/controller/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,15 @@ def __execute_apply_dml_events_validation_query(
if event_pks:
event_pks_str = ','.join([str(pk) for pk in event_pks])
dest_cursor.execute(f'''
SELECT {metadata.pk_column} FROM {metadata.destination_db}.{metadata.destination_table} WHERE {metadata.pk_column} IN ({event_pks_str})
SELECT {metadata.pk_column} FROM {metadata.destination_db}.{metadata.destination_table}
WHERE {metadata.pk_column} IN ({event_pks_str})
''')
not_deleted_pks = set([row[0] for row in dest_cursor.fetchall()])
if dest_cursor.rowcount > 0:
# Check if deleted pks are reinserted
source_cursor.execute(f'''
SELECT {metadata.pk_column} FROM {metadata.source_db}.{metadata.source_table} WHERE {metadata.pk_column} IN ({event_pks_str})
SELECT {metadata.pk_column} FROM {metadata.source_db}.{metadata.source_table}
WHERE {metadata.pk_column} IN ({event_pks_str})
''')
reinserted_pks = set([row[0] for row in source_cursor.fetchall()])
if reinserted_pks:
Expand Down
3 changes: 2 additions & 1 deletion src/sbosc/operations/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ def apply_update(self, db, updated_pks):
def _get_not_imported_pks_query(self, start_pk, end_pk):
return f'''
SELECT source.{self.pk_column} FROM {self.source_db}.{self.source_table} AS source
LEFT JOIN {self.destination_db}.{self.destination_table} AS dest ON source.{self.pk_column} = dest.{self.pk_column}
LEFT JOIN {self.destination_db}.{self.destination_table} AS dest
ON source.{self.pk_column} = dest.{self.pk_column}
WHERE source.{self.pk_column} BETWEEN {start_pk} AND {end_pk}
AND dest.{self.pk_column} IS NULL
'''
Expand Down

0 comments on commit 57b67c7

Please sign in to comment.