Skip to content

Commit

Permalink
Merge pull request #28 from sendbird/feature/auto-detect-pk-column
Browse files Browse the repository at this point in the history
Auto detect PK column
  • Loading branch information
jjh-kim authored Nov 27, 2024
2 parents 38d9d9a + 57b67c7 commit 20e2783
Show file tree
Hide file tree
Showing 13 changed files with 113 additions and 79 deletions.
23 changes: 12 additions & 11 deletions doc/operation-class.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,17 @@ class MessageRetentionOperation(BaseOperation):
INSERT INTO {self.source_db}.{self.destination_table}({self.source_columns})
SELECT {self.source_columns}
FROM {self.source_db}.{self.source_table} AS source
WHERE source.id BETWEEN {start_pk} AND {end_pk}
WHERE source.{self.pk_column} BETWEEN {start_pk} AND {end_pk}
AND source.ts > DATE_SUB(NOW(), INTERVAL 30 DAY)
"""
def _get_not_imported_pks_query(self, start_pk, end_pk):
return f'''
SELECT source.id FROM {self.source_db}.{self.source_table} AS source
LEFT JOIN {self.source_db}.{self.destination_table} AS dest ON source.id = dest.id
WHERE source.id BETWEEN {start_pk} AND {end_pk}
SELECT source.{self.pk_column} FROM {self.source_db}.{self.source_table} AS source
LEFT JOIN {self.source_db}.{self.destination_table} AS dest
ON source.{self.pk_column} = dest.{self.pk_column}
WHERE source.{self.pk_column} BETWEEN {start_pk} AND {end_pk}
AND source.ts > DATE_SUB(NOW(), INTERVAL 30 DAY)
AND dest.id IS NULL
AND dest.{self.pk_column} IS NULL
'''
```
Expand All @@ -48,20 +49,20 @@ class CrossClusterMessageRetentionOperation(CrossClusterBaseOperation):
def _select_batch_query(self, start_pk, end_pk):
return f'''
SELECT {self.source_columns} FROM {self.source_db}.{self.source_table}
WHERE id BETWEEN {start_pk} AND {end_pk}
WHERE {self.pk_column} BETWEEN {start_pk} AND {end_pk}
AND source.ts > DATE_SUB(NOW(), INTERVAL 30 DAY)
'''
def get_not_imported_pks(self, source_cursor, dest_cursor, start_pk, end_pk):
source_cursor.execute(f'''
SELECT id FROM {self.source_db}.{self.source_table}
WHERE id BETWEEN {start_pk} AND {end_pk}
SELECT {self.pk_column} FROM {self.source_db}.{self.source_table}
WHERE {self.pk_column} BETWEEN {start_pk} AND {end_pk}
AND source.ts > DATE_SUB(NOW(), INTERVAL 30 DAY)
''')
source_pks = [row[0] for row in source_cursor.fetchall()]
dest_cursor.execute(f'''
SELECT id FROM {self.destination_db}.{self.destination_table}
WHERE id BETWEEN {start_pk} AND {end_pk}
SELECT {self.pk_column} FROM {self.destination_db}.{self.destination_table}
WHERE {self.pk_column} BETWEEN {start_pk} AND {end_pk}
AND source.ts > DATE_SUB(NOW(), INTERVAL 30 DAY)
''')
dest_pks = [row[0] for row in dest_cursor.fetchall()]
Expand Down Expand Up @@ -89,7 +90,7 @@ class MessageRetentionOperation(BaseOperation):
INSERT INTO {self.source_db}.{self.destination_table}({self.source_columns})
SELECT {self.source_columns}
FROM {self.source_db}.{self.source_table} AS source
WHERE source.id BETWEEN {start_pk} AND {end_pk}
WHERE source.{self.pk_column} BETWEEN {start_pk} AND {end_pk}
AND source.ts > DATE_SUB(NOW(), INTERVAL {self.operation_config.retention_days} DAY)
"""
```
3 changes: 2 additions & 1 deletion src/modules/redis/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ class Metadata(Hash):
destination_db: str
destination_table: str
source_columns: str
max_id: int
pk_column: str
max_pk: int
start_datetime: datetime


Expand Down
12 changes: 6 additions & 6 deletions src/sbosc/controller/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,14 @@ def create_bulk_import_chunks(self):
self.redis_data.remove_all_chunks()

metadata = self.redis_data.metadata
max_id = metadata.max_id
max_pk = metadata.max_pk

# chunk_count is determined by min_chunk_size and max_chunk_count
# Each chunk will have min_chunk_size rows and the number of chunks should not exceed max_chunk_count
min_chunk_size = config.MIN_CHUNK_SIZE
max_chunk_count = config.MAX_CHUNK_COUNT # Number of chunks means max number of worker threads
chunk_count = min(max_id // min_chunk_size, max_chunk_count)
chunk_size = max_id // chunk_count
chunk_count = min(max_pk // min_chunk_size, max_chunk_count)
chunk_size = max_pk // chunk_count

# Create chunks
# Each chunk will have a range of primary key values [start_pk, end_pk]
Expand All @@ -79,7 +79,7 @@ def create_bulk_import_chunks(self):
start_pk = i * chunk_size + 1
end_pk = (i + 1) * chunk_size
if i == chunk_count - 1:
end_pk = max_id
end_pk = max_pk

chunk_id = f"{self.migration_id}-{i}"
chunk_info = self.redis_data.get_chunk_info(chunk_id)
Expand Down Expand Up @@ -112,7 +112,7 @@ def create_bulk_import_chunks(self):
self.redis_data.set_current_stage(Stage.BULK_IMPORT)
self.slack.send_message(
subtitle="Bulk import started",
message=f"Max id: {max_id}\n"
message=f"Max PK: {max_pk}\n"
f"Chunk count: {chunk_count}\n"
f"Chunk size: {chunk_size}\n"
f"Batch size: {config.MIN_BATCH_SIZE}\n"
Expand Down Expand Up @@ -343,7 +343,7 @@ def swap_tables(self):
old_source_table = f"{metadata.source_db}.{self.redis_data.old_source_table}"
cursor.execute(f"RENAME TABLE {source_table} TO {old_source_table}")
after_rename_table_timestamp = time.time()
cursor.execute(f"SELECT MAX(id) FROM {old_source_table}")
cursor.execute(f"SELECT MAX({metadata.pk_column}) FROM {old_source_table}")
final_max_id = cursor.fetchone()[0]

with self.validator.migration_operation.override_source_table(self.redis_data.old_source_table):
Expand Down
21 changes: 18 additions & 3 deletions src/sbosc/controller/initializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,25 @@ def fetch_metadata(self, redis_data):
metadata.source_columns = cursor.fetchone()[0]
self.logger.info("Saved source column schema to Redis")

# Get pk column
cursor.execute(f'''
SELECT COLUMN_NAME FROM information_schema.COLUMNS
WHERE TABLE_SCHEMA = '{metadata.source_db}' AND TABLE_NAME = '{metadata.source_table}'
AND COLUMN_KEY = 'PRI' AND EXTRA LIKE '%auto_increment%'
''')
if cursor.rowcount == 0:
raise Exception("Auto increment primary key column not found")
metadata.pk_column = f"`{cursor.fetchone()[0]}`"
self.logger.info("Saved primary key column to Redis")

# Get max id
cursor.execute("SELECT MAX(id) FROM %s.%s" % (metadata.source_db, metadata.source_table))
max_id = cursor.fetchone()[0]
metadata.max_id = max_id
cursor.execute('''
SELECT MAX(%s) FROM %s.%s
''' % (metadata.pk_column, metadata.source_db, metadata.source_table))
max_pk = cursor.fetchone()[0]
if max_pk is None:
raise Exception("No data in source table")
metadata.max_pk = max_pk
self.logger.info("Saved total rows to Redis")

metadata.start_datetime = datetime.now()
Expand Down
10 changes: 6 additions & 4 deletions src/sbosc/controller/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ def bulk_import_validation(self):
metadata = self.redis_data.metadata
range_queue = Queue()
start_pk = 0
while start_pk <= metadata.max_id:
range_queue.put((start_pk, min(start_pk + self.bulk_import_batch_size, metadata.max_id)))
while start_pk <= metadata.max_pk:
range_queue.put((start_pk, min(start_pk + self.bulk_import_batch_size, metadata.max_pk)))
start_pk += self.bulk_import_batch_size + 1
failed_pks = []

Expand Down Expand Up @@ -153,13 +153,15 @@ def __execute_apply_dml_events_validation_query(
if event_pks:
event_pks_str = ','.join([str(pk) for pk in event_pks])
dest_cursor.execute(f'''
SELECT id FROM {metadata.destination_db}.{metadata.destination_table} WHERE id IN ({event_pks_str})
SELECT {metadata.pk_column} FROM {metadata.destination_db}.{metadata.destination_table}
WHERE {metadata.pk_column} IN ({event_pks_str})
''')
not_deleted_pks = set([row[0] for row in dest_cursor.fetchall()])
if dest_cursor.rowcount > 0:
# Check if deleted pks are reinserted
source_cursor.execute(f'''
SELECT id FROM {metadata.source_db}.{metadata.source_table} WHERE id IN ({event_pks_str})
SELECT {metadata.pk_column} FROM {metadata.source_db}.{metadata.source_table}
WHERE {metadata.pk_column} IN ({event_pks_str})
''')
reinserted_pks = set([row[0] for row in source_cursor.fetchall()])
if reinserted_pks:
Expand Down
4 changes: 2 additions & 2 deletions src/sbosc/monitor/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,8 @@ def check_migration_status(self):
if last_pk_inserted and last_pk_inserted >= chunk_info.start_pk:
inserted_rows += last_pk_inserted - chunk_info.start_pk

if self.redis_data.metadata.max_id:
bulk_import_progress = inserted_rows / self.redis_data.metadata.max_id * 100
if self.redis_data.metadata.max_pk:
bulk_import_progress = inserted_rows / self.redis_data.metadata.max_pk * 100
self.metric_sender.submit('sb_osc_bulk_import_progress', bulk_import_progress)

self.submit_event_handler_timestamps()
Expand Down
Loading

0 comments on commit 20e2783

Please sign in to comment.