Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release v1.2.0 #37

Merged
merged 24 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
72b0b1b
Auto detect pk column
jjh-kim Sep 4, 2024
57b67c7
lint fix
jjh-kim Nov 27, 2024
20e2783
Merge pull request #28 from sendbird/feature/auto-detect-pk-column
jjh-kim Nov 27, 2024
1347c12
Add get_max_pk to operation class
jjh-kim Nov 27, 2024
0fd6bd1
lint fix
jjh-kim Nov 27, 2024
0ccb8b8
Merge pull request #29 from sendbird/feature/move-get-mak-pk-to-opera…
jjh-kim Nov 27, 2024
a0b34d2
Support PK column without AUTO_INCREMENT
jjh-kim Nov 27, 2024
1d6ca27
change error message
jjh-kim Nov 27, 2024
4412b1a
Add note on use_batch_size_multiplier
jjh-kim Nov 27, 2024
d27e3a7
fix comment
jjh-kim Nov 27, 2024
3e4e15a
remove TODO
jjh-kim Nov 27, 2024
d6e0f7a
Merge pull request #30 from sendbird/feature/support-non-auto-increme…
jjh-kim Nov 27, 2024
7ecae91
Add disable eventhandler option
jjh-kim Jan 9, 2025
a7bef81
Merge pull request #31 from sendbird/feature/disable-eventhandler
jjh-kim Jan 9, 2025
94f43e9
Fix bug when adding index with CrossClusterOperation
jjh-kim Jan 9, 2025
45d5ae1
Fix connection ping bug
jjh-kim Jan 9, 2025
085eb94
Merge pull request #32 from sendbird/bugfix/cross-cluster-add-index
jjh-kim Jan 9, 2025
98de164
Use multiprocessing for eventhandler
jjh-kim Jan 9, 2025
82d3643
Merge pull request #33 from sendbird/bugfix/connection-ping
jjh-kim Jan 9, 2025
0ddfaea
Merge pull request #34 from sendbird/feature/eventhandler-multiproces…
jjh-kim Jan 9, 2025
3a00ac4
Allow _get_not_imported_pks_query to be overriden
jjh-kim Jan 9, 2025
2b872df
Merge pull request #35 from sendbird/feature/not-imported-pks-query-o…
jjh-kim Jan 9, 2025
60944bc
Improve redis depends_on
jjh-kim Jan 9, 2025
4c89637
Merge pull request #36 from sendbird/feature/docker-compose-dependency
jjh-kim Jan 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion deploy/compose/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ services:
command: ["python", "-m", "sbosc.controller.main"]
restart: always
depends_on:
- redis
redis:
condition: service_healthy

eventhandler:
<<: *component-base
Expand Down Expand Up @@ -49,6 +50,11 @@ services:
volumes:
- redis-data:/data
- ./redis.conf:/usr/local/etc/redis/redis.conf
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 5s
retries: 5

volumes:
redis-data:
8 changes: 8 additions & 0 deletions doc/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ If you set this parameter to `True`, SB-OSC will skip the bulk import stage and
### disable_apply_dml_events
If you set this parameter to `True`, SB-OSC will pause before `apply_dml_events` stage. This is useful when you have additional steps to perform manually before applying DML events.

### disable_eventhandler
If you set this parameter to `True`, SB-OSC will disable eventhandler, which means it will not process binlog events. Only bulk import will be performed.

After `bulk_import_validation` stage it will move directly to `done` stage. So, `add_index` stage will be skipped since `apply_dml_events` stage will not be executed.


## Chunk
### max_chunk_count & min_chunk_size
Expand All @@ -34,3 +39,6 @@ These parameters control insert throughput of SB-OSC. `batch_size` and `thread_c

`LIMIT batch_size` is applied to the next query to prevent from inserting too many rows at once.

**Note:** This option utilizes cursor.lastrowid to the `last_inserted_pk` which only returns non-zero value when table has **AUTO_INCREMENT** column.
([MySQL Document](https://dev.mysql.com/doc/connector-python/en/connector-python-api-mysqlcursor-lastrowid.html))

23 changes: 12 additions & 11 deletions doc/operation-class.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,17 @@ class MessageRetentionOperation(BaseOperation):
INSERT INTO {self.source_db}.{self.destination_table}({self.source_columns})
SELECT {self.source_columns}
FROM {self.source_db}.{self.source_table} AS source
WHERE source.id BETWEEN {start_pk} AND {end_pk}
WHERE source.{self.pk_column} BETWEEN {start_pk} AND {end_pk}
AND source.ts > DATE_SUB(NOW(), INTERVAL 30 DAY)
"""
def _get_not_imported_pks_query(self, start_pk, end_pk):
return f'''
SELECT source.id FROM {self.source_db}.{self.source_table} AS source
LEFT JOIN {self.source_db}.{self.destination_table} AS dest ON source.id = dest.id
WHERE source.id BETWEEN {start_pk} AND {end_pk}
SELECT source.{self.pk_column} FROM {self.source_db}.{self.source_table} AS source
LEFT JOIN {self.source_db}.{self.destination_table} AS dest
ON source.{self.pk_column} = dest.{self.pk_column}
WHERE source.{self.pk_column} BETWEEN {start_pk} AND {end_pk}
AND source.ts > DATE_SUB(NOW(), INTERVAL 30 DAY)
AND dest.id IS NULL
AND dest.{self.pk_column} IS NULL
'''
```

Expand All @@ -48,20 +49,20 @@ class CrossClusterMessageRetentionOperation(CrossClusterBaseOperation):
def _select_batch_query(self, start_pk, end_pk):
return f'''
SELECT {self.source_columns} FROM {self.source_db}.{self.source_table}
WHERE id BETWEEN {start_pk} AND {end_pk}
WHERE {self.pk_column} BETWEEN {start_pk} AND {end_pk}
AND source.ts > DATE_SUB(NOW(), INTERVAL 30 DAY)
'''

def get_not_imported_pks(self, source_cursor, dest_cursor, start_pk, end_pk):
source_cursor.execute(f'''
SELECT id FROM {self.source_db}.{self.source_table}
WHERE id BETWEEN {start_pk} AND {end_pk}
SELECT {self.pk_column} FROM {self.source_db}.{self.source_table}
WHERE {self.pk_column} BETWEEN {start_pk} AND {end_pk}
AND source.ts > DATE_SUB(NOW(), INTERVAL 30 DAY)
''')
source_pks = [row[0] for row in source_cursor.fetchall()]
dest_cursor.execute(f'''
SELECT id FROM {self.destination_db}.{self.destination_table}
WHERE id BETWEEN {start_pk} AND {end_pk}
SELECT {self.pk_column} FROM {self.destination_db}.{self.destination_table}
WHERE {self.pk_column} BETWEEN {start_pk} AND {end_pk}
AND source.ts > DATE_SUB(NOW(), INTERVAL 30 DAY)
''')
dest_pks = [row[0] for row in dest_cursor.fetchall()]
Expand Down Expand Up @@ -89,7 +90,7 @@ class MessageRetentionOperation(BaseOperation):
INSERT INTO {self.source_db}.{self.destination_table}({self.source_columns})
SELECT {self.source_columns}
FROM {self.source_db}.{self.source_table} AS source
WHERE source.id BETWEEN {start_pk} AND {end_pk}
WHERE source.{self.pk_column} BETWEEN {start_pk} AND {end_pk}
AND source.ts > DATE_SUB(NOW(), INTERVAL {self.operation_config.retention_days} DAY)
"""
```
1 change: 1 addition & 0 deletions src/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ class Config:
USE_BATCH_SIZE_MULTIPLIER = False

# EventHandler config
DISABLE_EVENTHANDLER = False
EVENTHANDLER_THREAD_COUNT = 4
EVENTHANDLER_THREAD_TIMEOUT_IN_SECONDS = 300
INIT_BINLOG_FILE: str = None
Expand Down
10 changes: 5 additions & 5 deletions src/modules/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,10 @@ def cursor(self, cursorclass=None):
def ping(self):
if not self._conn:
self._conn = self.connect()
self._conn.ping()
try:
self._conn.ping()
except OperationalError:
self._conn = self.connect()

def close(self):
if self._conn:
Expand Down Expand Up @@ -104,10 +107,7 @@ def get_connection(self):

yield conn

try:
conn.ping()
except OperationalError:
conn = Connection(self.endpoint)
conn.ping()
if self.free_connections.full():
raise Exception("Connection pool full")
else:
Expand Down
3 changes: 2 additions & 1 deletion src/modules/redis/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ class Metadata(Hash):
destination_db: str
destination_table: str
source_columns: str
max_id: int
pk_column: str
max_pk: int
start_datetime: datetime


Expand Down
75 changes: 39 additions & 36 deletions src/sbosc/controller/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ def start(self):
if action:
action()

# TODO: Add Redis data validation if needed
time.sleep(self.interval)

# Close db connection
Expand All @@ -63,14 +62,14 @@ def create_bulk_import_chunks(self):
self.redis_data.remove_all_chunks()

metadata = self.redis_data.metadata
max_id = metadata.max_id
max_pk = metadata.max_pk

# chunk_count is determined by min_chunk_size and max_chunk_count
# Each chunk will have min_chunk_size rows and the number of chunks should not exceed max_chunk_count
min_chunk_size = config.MIN_CHUNK_SIZE
max_chunk_count = config.MAX_CHUNK_COUNT # Number of chunks means max number of worker threads
chunk_count = min(max_id // min_chunk_size, max_chunk_count)
chunk_size = max_id // chunk_count
chunk_count = min(max_pk // min_chunk_size, max_chunk_count)
chunk_size = max_pk // chunk_count

# Create chunks
# Each chunk will have a range of primary key values [start_pk, end_pk]
Expand All @@ -79,7 +78,7 @@ def create_bulk_import_chunks(self):
start_pk = i * chunk_size + 1
end_pk = (i + 1) * chunk_size
if i == chunk_count - 1:
end_pk = max_id
end_pk = max_pk

chunk_id = f"{self.migration_id}-{i}"
chunk_info = self.redis_data.get_chunk_info(chunk_id)
Expand Down Expand Up @@ -112,7 +111,7 @@ def create_bulk_import_chunks(self):
self.redis_data.set_current_stage(Stage.BULK_IMPORT)
self.slack.send_message(
subtitle="Bulk import started",
message=f"Max id: {max_id}\n"
message=f"Max PK: {max_pk}\n"
f"Chunk count: {chunk_count}\n"
f"Chunk size: {chunk_size}\n"
f"Batch size: {config.MIN_BATCH_SIZE}\n"
Expand Down Expand Up @@ -166,7 +165,10 @@ def validate_bulk_import(self):
self.redis_data.set_current_stage(Stage.BULK_IMPORT_VALIDATION_FAILED)
self.slack.send_message(message="Bulk import validation failed", color="danger")
else:
self.redis_data.set_current_stage(Stage.APPLY_DML_EVENTS)
if not config.DISABLE_EVENTHANDLER:
self.redis_data.set_current_stage(Stage.APPLY_DML_EVENTS)
else:
self.redis_data.set_current_stage(Stage.DONE)
self.slack.send_message(message="Bulk import validation succeeded", color="good")
except StopFlagSet:
return
Expand Down Expand Up @@ -213,45 +215,46 @@ def add_index(self):
finished_all_creation = False
while not self.stop_flag:
finished_creation = False
with self.db.cursor() as cursor:
cursor: Cursor
with self.db.cursor(role='reader') as source_cursor:
source_cursor: Cursor

index_info = None
cursor.execute(f'''
source_cursor.execute(f'''
SELECT index_name FROM {config.SBOSC_DB}.index_creation_status
WHERE migration_id = %s AND ended_at IS NULL AND started_at IS NOT NULL
''', (self.migration_id,))

if cursor.rowcount > 0:
index_names = [row[0] for row in cursor.fetchall()]
if source_cursor.rowcount > 0:
index_names = [row[0] for row in source_cursor.fetchall()]
self.slack.send_message(
subtitle="Found unfinished index creation", message=f"Indexes: {index_names}", color="warning")

while True:
if self.stop_flag:
return
cursor.execute(f'''
SELECT DISTINCT database_name, table_name, index_name FROM mysql.innodb_index_stats
WHERE database_name = %s AND table_name = %s
AND index_name IN ({','.join(['%s'] * len(index_names))})
''', [metadata.destination_db, metadata.destination_table] + index_names)
if cursor.rowcount == len(index_names):
finished_creation = True
break
with self.db.cursor(host='dest', role='reader') as dest_cursor:
dest_cursor.execute(f'''
SELECT DISTINCT database_name, table_name, index_name FROM mysql.innodb_index_stats
WHERE database_name = %s AND table_name = %s
AND index_name IN ({','.join(['%s'] * len(index_names))})
''', [metadata.destination_db, metadata.destination_table] + index_names)
if dest_cursor.rowcount == len(index_names):
finished_creation = True
break
self.logger.info("Waiting for index creation to finish")
time.sleep(60)

else:
cursor.execute(f'''
source_cursor.execute(f'''
SELECT index_name, index_columns, is_unique FROM {config.SBOSC_DB}.index_creation_status
WHERE migration_id = %s AND ended_at IS NULL LIMIT {config.INDEX_CREATED_PER_QUERY}
''', (self.migration_id,))

if cursor.rowcount == 0:
if source_cursor.rowcount == 0:
finished_all_creation = True
break

index_info = cursor.fetchall()
index_info = source_cursor.fetchall()
index_names = [index_name for index_name, *_ in index_info]

if index_info and not finished_creation:
Expand All @@ -260,30 +263,30 @@ def add_index(self):

# update ended_at
started_at = datetime.now()
with self.db.cursor() as cursor:
cursor: Cursor
cursor.executemany(f'''
with self.db.cursor() as source_cursor:
source_cursor: Cursor
source_cursor.executemany(f'''
UPDATE {config.SBOSC_DB}.index_creation_status SET started_at = %s
WHERE migration_id = %s AND index_name = %s
''', [(started_at, self.migration_id, index_name) for index_name in index_names])

# add index
with self.db.cursor(host='dest') as cursor:
cursor: Cursor
with self.db.cursor(host='dest') as dest_cursor:
dest_cursor: Cursor

# set session variables
if config.INNODB_DDL_BUFFER_SIZE is not None:
cursor.execute(f"SET SESSION innodb_ddl_buffer_size = {config.INNODB_DDL_BUFFER_SIZE}")
dest_cursor.execute(f"SET SESSION innodb_ddl_buffer_size = {config.INNODB_DDL_BUFFER_SIZE}")
self.logger.info(f"Set innodb_ddl_buffer_size to {config.INNODB_DDL_BUFFER_SIZE}")
if config.INNODB_DDL_THREADS is not None:
cursor.execute(f"SET SESSION innodb_ddl_threads = {config.INNODB_DDL_THREADS}")
dest_cursor.execute(f"SET SESSION innodb_ddl_threads = {config.INNODB_DDL_THREADS}")
self.logger.info(f"Set innodb_ddl_threads to {config.INNODB_DDL_THREADS}")
if config.INNODB_PARALLEL_READ_THREADS is not None:
cursor.execute(
dest_cursor.execute(
f"SET SESSION innodb_parallel_read_threads = {config.INNODB_PARALLEL_READ_THREADS}")
self.logger.info(f"Set innodb_parallel_read_threads to {config.INNODB_PARALLEL_READ_THREADS}")

cursor.execute(f'''
dest_cursor.execute(f'''
ALTER TABLE {metadata.destination_db}.{metadata.destination_table}
{', '.join([
f"ADD{' UNIQUE' if is_unique else ''} INDEX {index_name} ({index_columns})"
Expand All @@ -296,9 +299,9 @@ def add_index(self):
if finished_creation:
# update ended_at
ended_at = datetime.now()
with self.db.cursor() as cursor:
cursor: Cursor
cursor.executemany(f'''
with self.db.cursor() as source_cursor:
source_cursor: Cursor
source_cursor.executemany(f'''
UPDATE {config.SBOSC_DB}.index_creation_status SET ended_at = %s
WHERE migration_id = %s AND index_name = %s
''', [(ended_at, self.migration_id, index_name) for index_name in index_names])
Expand Down Expand Up @@ -343,7 +346,7 @@ def swap_tables(self):
old_source_table = f"{metadata.source_db}.{self.redis_data.old_source_table}"
cursor.execute(f"RENAME TABLE {source_table} TO {old_source_table}")
after_rename_table_timestamp = time.time()
cursor.execute(f"SELECT MAX(id) FROM {old_source_table}")
cursor.execute(f"SELECT MAX({metadata.pk_column}) FROM {old_source_table}")
final_max_id = cursor.fetchone()[0]

with self.validator.migration_operation.override_source_table(self.redis_data.old_source_table):
Expand Down
28 changes: 23 additions & 5 deletions src/sbosc/controller/initializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,14 +174,32 @@ def fetch_metadata(self, redis_data):
metadata.source_columns = cursor.fetchone()[0]
self.logger.info("Saved source column schema to Redis")

# Get max id
cursor.execute("SELECT MAX(id) FROM %s.%s" % (metadata.source_db, metadata.source_table))
max_id = cursor.fetchone()[0]
metadata.max_id = max_id
# Get pk column
cursor.execute(f'''
SELECT COLUMN_NAME FROM information_schema.COLUMNS
WHERE TABLE_SCHEMA = '{metadata.source_db}' AND TABLE_NAME = '{metadata.source_table}'
AND COLUMN_KEY = 'PRI' AND DATA_TYPE IN ('int', 'bigint')
''')
if cursor.rowcount == 0:
raise Exception("Integer primary key column not found")
metadata.pk_column = f"`{cursor.fetchone()[0]}`"
self.logger.info("Saved primary key column to Redis")

# Get max PK
cursor.execute('''
SELECT MAX(%s) FROM %s.%s
''' % (metadata.pk_column, metadata.source_db, metadata.source_table))
max_pk = cursor.fetchone()[0]
if max_pk is None:
raise Exception("No data in source table")
metadata.max_pk = max_pk
self.logger.info("Saved total rows to Redis")

metadata.start_datetime = datetime.now()
redis_data.set_current_stage(Stage.START_EVENT_HANDLER)
if not config.DISABLE_EVENTHANDLER:
redis_data.set_current_stage(Stage.START_EVENT_HANDLER)
else:
redis_data.set_current_stage(Stage.BULK_IMPORT_CHUNK_CREATION)

def init_migration(self):
if not self.check_database_setup():
Expand Down
Loading
Loading