Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Auto detect PK column #28

Merged
merged 2 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 12 additions & 11 deletions doc/operation-class.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,17 @@ class MessageRetentionOperation(BaseOperation):
INSERT INTO {self.source_db}.{self.destination_table}({self.source_columns})
SELECT {self.source_columns}
FROM {self.source_db}.{self.source_table} AS source
WHERE source.id BETWEEN {start_pk} AND {end_pk}
WHERE source.{self.pk_column} BETWEEN {start_pk} AND {end_pk}
AND source.ts > DATE_SUB(NOW(), INTERVAL 30 DAY)
"""
def _get_not_imported_pks_query(self, start_pk, end_pk):
return f'''
SELECT source.id FROM {self.source_db}.{self.source_table} AS source
LEFT JOIN {self.source_db}.{self.destination_table} AS dest ON source.id = dest.id
WHERE source.id BETWEEN {start_pk} AND {end_pk}
SELECT source.{self.pk_column} FROM {self.source_db}.{self.source_table} AS source
LEFT JOIN {self.source_db}.{self.destination_table} AS dest
ON source.{self.pk_column} = dest.{self.pk_column}
WHERE source.{self.pk_column} BETWEEN {start_pk} AND {end_pk}
AND source.ts > DATE_SUB(NOW(), INTERVAL 30 DAY)
AND dest.id IS NULL
AND dest.{self.pk_column} IS NULL
'''
```

Expand All @@ -48,20 +49,20 @@ class CrossClusterMessageRetentionOperation(CrossClusterBaseOperation):
def _select_batch_query(self, start_pk, end_pk):
return f'''
SELECT {self.source_columns} FROM {self.source_db}.{self.source_table}
WHERE id BETWEEN {start_pk} AND {end_pk}
WHERE {self.pk_column} BETWEEN {start_pk} AND {end_pk}
AND source.ts > DATE_SUB(NOW(), INTERVAL 30 DAY)
'''

def get_not_imported_pks(self, source_cursor, dest_cursor, start_pk, end_pk):
source_cursor.execute(f'''
SELECT id FROM {self.source_db}.{self.source_table}
WHERE id BETWEEN {start_pk} AND {end_pk}
SELECT {self.pk_column} FROM {self.source_db}.{self.source_table}
WHERE {self.pk_column} BETWEEN {start_pk} AND {end_pk}
AND source.ts > DATE_SUB(NOW(), INTERVAL 30 DAY)
''')
source_pks = [row[0] for row in source_cursor.fetchall()]
dest_cursor.execute(f'''
SELECT id FROM {self.destination_db}.{self.destination_table}
WHERE id BETWEEN {start_pk} AND {end_pk}
SELECT {self.pk_column} FROM {self.destination_db}.{self.destination_table}
WHERE {self.pk_column} BETWEEN {start_pk} AND {end_pk}
AND source.ts > DATE_SUB(NOW(), INTERVAL 30 DAY)
''')
dest_pks = [row[0] for row in dest_cursor.fetchall()]
Expand Down Expand Up @@ -89,7 +90,7 @@ class MessageRetentionOperation(BaseOperation):
INSERT INTO {self.source_db}.{self.destination_table}({self.source_columns})
SELECT {self.source_columns}
FROM {self.source_db}.{self.source_table} AS source
WHERE source.id BETWEEN {start_pk} AND {end_pk}
WHERE source.{self.pk_column} BETWEEN {start_pk} AND {end_pk}
AND source.ts > DATE_SUB(NOW(), INTERVAL {self.operation_config.retention_days} DAY)
"""
```
3 changes: 2 additions & 1 deletion src/modules/redis/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ class Metadata(Hash):
destination_db: str
destination_table: str
source_columns: str
max_id: int
pk_column: str
max_pk: int
start_datetime: datetime


Expand Down
12 changes: 6 additions & 6 deletions src/sbosc/controller/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,14 @@ def create_bulk_import_chunks(self):
self.redis_data.remove_all_chunks()

metadata = self.redis_data.metadata
max_id = metadata.max_id
max_pk = metadata.max_pk

# chunk_count is determined by min_chunk_size and max_chunk_count
# Each chunk will have min_chunk_size rows and the number of chunks should not exceed max_chunk_count
min_chunk_size = config.MIN_CHUNK_SIZE
max_chunk_count = config.MAX_CHUNK_COUNT # Number of chunks means max number of worker threads
chunk_count = min(max_id // min_chunk_size, max_chunk_count)
chunk_size = max_id // chunk_count
chunk_count = min(max_pk // min_chunk_size, max_chunk_count)
chunk_size = max_pk // chunk_count

# Create chunks
# Each chunk will have a range of primary key values [start_pk, end_pk]
Expand All @@ -79,7 +79,7 @@ def create_bulk_import_chunks(self):
start_pk = i * chunk_size + 1
end_pk = (i + 1) * chunk_size
if i == chunk_count - 1:
end_pk = max_id
end_pk = max_pk

chunk_id = f"{self.migration_id}-{i}"
chunk_info = self.redis_data.get_chunk_info(chunk_id)
Expand Down Expand Up @@ -112,7 +112,7 @@ def create_bulk_import_chunks(self):
self.redis_data.set_current_stage(Stage.BULK_IMPORT)
self.slack.send_message(
subtitle="Bulk import started",
message=f"Max id: {max_id}\n"
message=f"Max PK: {max_pk}\n"
f"Chunk count: {chunk_count}\n"
f"Chunk size: {chunk_size}\n"
f"Batch size: {config.MIN_BATCH_SIZE}\n"
Expand Down Expand Up @@ -343,7 +343,7 @@ def swap_tables(self):
old_source_table = f"{metadata.source_db}.{self.redis_data.old_source_table}"
cursor.execute(f"RENAME TABLE {source_table} TO {old_source_table}")
after_rename_table_timestamp = time.time()
cursor.execute(f"SELECT MAX(id) FROM {old_source_table}")
cursor.execute(f"SELECT MAX({metadata.pk_column}) FROM {old_source_table}")
final_max_id = cursor.fetchone()[0]

with self.validator.migration_operation.override_source_table(self.redis_data.old_source_table):
Expand Down
21 changes: 18 additions & 3 deletions src/sbosc/controller/initializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,25 @@ def fetch_metadata(self, redis_data):
metadata.source_columns = cursor.fetchone()[0]
self.logger.info("Saved source column schema to Redis")

# Get pk column
cursor.execute(f'''
SELECT COLUMN_NAME FROM information_schema.COLUMNS
WHERE TABLE_SCHEMA = '{metadata.source_db}' AND TABLE_NAME = '{metadata.source_table}'
AND COLUMN_KEY = 'PRI' AND EXTRA LIKE '%auto_increment%'
''')
if cursor.rowcount == 0:
raise Exception("Auto increment primary key column not found")
metadata.pk_column = f"`{cursor.fetchone()[0]}`"
self.logger.info("Saved primary key column to Redis")

# Get max id
cursor.execute("SELECT MAX(id) FROM %s.%s" % (metadata.source_db, metadata.source_table))
max_id = cursor.fetchone()[0]
metadata.max_id = max_id
cursor.execute('''
SELECT MAX(%s) FROM %s.%s
''' % (metadata.pk_column, metadata.source_db, metadata.source_table))
max_pk = cursor.fetchone()[0]
if max_pk is None:
raise Exception("No data in source table")
metadata.max_pk = max_pk
self.logger.info("Saved total rows to Redis")

metadata.start_datetime = datetime.now()
Expand Down
10 changes: 6 additions & 4 deletions src/sbosc/controller/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ def bulk_import_validation(self):
metadata = self.redis_data.metadata
range_queue = Queue()
start_pk = 0
while start_pk <= metadata.max_id:
range_queue.put((start_pk, min(start_pk + self.bulk_import_batch_size, metadata.max_id)))
while start_pk <= metadata.max_pk:
range_queue.put((start_pk, min(start_pk + self.bulk_import_batch_size, metadata.max_pk)))
start_pk += self.bulk_import_batch_size + 1
failed_pks = []

Expand Down Expand Up @@ -153,13 +153,15 @@ def __execute_apply_dml_events_validation_query(
if event_pks:
event_pks_str = ','.join([str(pk) for pk in event_pks])
dest_cursor.execute(f'''
SELECT id FROM {metadata.destination_db}.{metadata.destination_table} WHERE id IN ({event_pks_str})
SELECT {metadata.pk_column} FROM {metadata.destination_db}.{metadata.destination_table}
WHERE {metadata.pk_column} IN ({event_pks_str})
''')
not_deleted_pks = set([row[0] for row in dest_cursor.fetchall()])
if dest_cursor.rowcount > 0:
# Check if deleted pks are reinserted
source_cursor.execute(f'''
SELECT id FROM {metadata.source_db}.{metadata.source_table} WHERE id IN ({event_pks_str})
SELECT {metadata.pk_column} FROM {metadata.source_db}.{metadata.source_table}
WHERE {metadata.pk_column} IN ({event_pks_str})
''')
reinserted_pks = set([row[0] for row in source_cursor.fetchall()])
if reinserted_pks:
Expand Down
4 changes: 2 additions & 2 deletions src/sbosc/monitor/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,8 @@ def check_migration_status(self):
if last_pk_inserted and last_pk_inserted >= chunk_info.start_pk:
inserted_rows += last_pk_inserted - chunk_info.start_pk

if self.redis_data.metadata.max_id:
bulk_import_progress = inserted_rows / self.redis_data.metadata.max_id * 100
if self.redis_data.metadata.max_pk:
bulk_import_progress = inserted_rows / self.redis_data.metadata.max_pk * 100
self.metric_sender.submit('sb_osc_bulk_import_progress', bulk_import_progress)

self.submit_event_handler_timestamps()
Expand Down
Loading