Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Background SSTable Compaction Support #22

Draft
wants to merge 1 commit into
base: refactor/cfoundation-setup-process
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 204 additions & 0 deletions samples/compaction_operations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
#!/usr/bin/env python3
"""
TidesDB Background Compaction with Enhanced Monitoring
"""
import os
import sys
import time
import glob
import logging
from pathlib import Path
from typing import Optional, List, Dict
from collections import defaultdict

from tidesdb import (
TidesDB,
TidesDBCompressionAlgo,
TidesDBMemtableDS,
)

logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

class CompactionMonitor:
def __init__(self, db_path: str):
self.db_path = db_path
self.db: Optional[TidesDB] = None
self.cf_name = "compaction_test_cf"
self.total_records = 0

def analyze_sstables(self) -> Dict:
cf_path = os.path.join(self.db_path, self.cf_name)
sst_files = glob.glob(os.path.join(cf_path, "sstable_*.sst"))

analysis = {
'total_count': len(sst_files),
'size_distribution': defaultdict(int),
'files': sorted(os.path.basename(f) for f in sst_files)
}

for sst in sst_files:
size = os.path.getsize(sst)
analysis['size_distribution'][size // (1024 * 1024)] += 1

return analysis

def save_current_state(self, filename: str, max_records: int) -> None:
"""Saves the current database state to a text file."""
try:
cf_path = os.path.join(self.db_path, self.cf_name)
output_path = os.path.join(cf_path, filename)
analysis = self.analyze_sstables()

with open(output_path, 'w') as f:
f.write(f"Database state snapshot - {filename}\n")
f.write("-" * 50 + "\n\n")

f.write("Current SST Files:\n")
for sst_file in analysis['files']:
f.write(f"- {sst_file}\n")
f.write("\n" + "-" * 50 + "\n\n")

count = 0
for i in range(max_records):
key = f"key:{i:08d}".encode()
try:
value = self.db.get(self.cf_name, key)
if value:
count += 1

current_sst = analysis['files'][count % len(analysis['files'])]
f.write(f"Key: {key.decode()}, Value: {value.decode()[:50]}... | In SST: {current_sst}\n")
except Exception:
continue

f.write("\n" + "-" * 50 + "\n")
f.write(f"Total records found: {count}\n")
f.write(f"Total SST files: {len(analysis['files'])}\n")

logger.info(f"Saved database state to {filename} - {count} records written")

except Exception as e:
logger.error(f"Error saving state to {filename}: {str(e)}")

def setup_database(self) -> None:
try:
logger.info("Opening database at: %s", self.db_path)
self.db = TidesDB.open(self.db_path)

logger.info("Creating column family: %s", self.cf_name)
self.db.create_column_family(
self.cf_name,
4 * 1024 * 1024,
6,
0.5,
True,
TidesDBCompressionAlgo.COMPRESS_SNAPPY,
True,
TidesDBMemtableDS.SKIP_LIST
)

batch_size = 500
total_batches = 20
value_size = 2000
records_inserted = 0

logger.info("Starting data insertion for 10 SST files...")

for batch in range(total_batches):
for i in range(batch_size):
record_num = batch * batch_size + i
key = f"key:{record_num:08d}".encode()
value = f"value:{record_num:08d}".encode() * value_size
self.db.put(self.cf_name, key, value, ttl=-1)
records_inserted += 1

analysis = self.analyze_sstables()
current_count = analysis['total_count']
logger.info(f"SST files created: {current_count}")

if current_count >= 10:
self.total_records = records_inserted
logger.info(f"Reached target of 10 SST files with {records_inserted} records")
break

analysis = self.analyze_sstables()
logger.info("Data insertion complete - Total SSTables: %d", analysis['total_count'])

time.sleep(1)
self.save_current_state("before_compaction.txt", self.total_records)

except Exception as e:
logger.error("Failed to setup database: %s", str(e))
self.cleanup()
raise

def run_compaction(self) -> bool:
try:
initial_analysis = self.analyze_sstables()
logger.info("Starting compaction monitoring:")
logger.info("Initial state: %d SSTables", initial_analysis['total_count'])

expected_count = 5
logger.info("Target count: 5 SSTables")

logger.info("Starting background compaction process...")
self.db.start_background_compaction(
self.cf_name,
interval_seconds=1,
min_sstables=2
)

start_time = time.time()
last_count = initial_analysis['total_count']

while time.time() - start_time < 30:
time.sleep(1)

current_analysis = self.analyze_sstables()
current_count = current_analysis['total_count']

if current_count != last_count:
logger.info("Compaction progress: %d -> %d SSTables", last_count, current_count)
last_count = current_count

if current_count <= expected_count:
time.sleep(1)
self.save_current_state("after_compaction.txt", self.total_records)
logger.info("Target achieved - 5 SST files reached!")
break

return True

except Exception as e:
logger.error(f"Error during compaction: {str(e)}")
return False

def cleanup(self) -> None:
try:
if hasattr(self, 'db') and self.db is not None:
logger.info("Closing database")
self.db.close()
self.db = None
except Exception as e:
logger.error("Error closing database: %s", str(e))


def main():
db_path = str(Path.cwd() / "compaction_output")

monitor = CompactionMonitor(db_path)
try:
monitor.setup_database()
monitor.run_compaction()
except Exception as e:
logger.error("Monitor failed: %s", str(e))
sys.exit(1)
finally:
monitor.cleanup()

if __name__ == "__main__":
main()
120 changes: 120 additions & 0 deletions tests/test_compaction.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
#!/usr/bin/env python3
"""
Test suite for TidesDB background compaction.
"""
import unittest
import time
import tempfile
import shutil
import logging
from pathlib import Path
from tidesdb import (
TidesDB,
TidesDBCompressionAlgo,
TidesDBMemtableDS,
)

logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

class TestBackgroundCompaction(unittest.TestCase):
def setUp(self):
"""Set up test database for background compaction."""
logger.info("Setting up test environment for compaction")
self.test_dir = tempfile.mkdtemp()
logger.info("Created temporary directory: %s", self.test_dir)
self.db = None
self.cf_name = "compaction_test_cf"

try:
logger.info("Opening database")
self.db = TidesDB.open(self.test_dir)

logger.info("Creating column family: %s", self.cf_name)

self.db.create_column_family(
self.cf_name,
64 * 1024 * 1024,
12,
0.24,
True,
TidesDBCompressionAlgo.COMPRESS_SNAPPY,
True,
TidesDBMemtableDS.SKIP_LIST
)
logger.info("Test environment setup completed successfully")
except Exception as e:
logger.error("Failed to setup test environment: %s", str(e))
self.tearDown()
raise

def tearDown(self):
"""Clean up test resources."""
logger.info("Cleaning up test resources")
if self.db:
try:
logger.info("Closing database")
self.db.close()
except Exception as e:
logger.error("Error closing database: %s", str(e))

logger.info("Removing temporary directory: %s", self.test_dir)
shutil.rmtree(self.test_dir, ignore_errors=True)
logger.info("Cleanup completed")

def test_background_compaction(self):
"""Test the background compaction process."""
logger.info("Starting background compaction test")

logger.info("Inserting data to trigger compaction")
batch_size = 500
total_batches = 5

for batch in range(total_batches):
for i in range(batch_size):
key = f"key:{batch * batch_size + i:08d}".encode()
value = f"value:{batch * batch_size + i:08d}".encode() * 2000
self.db.put(self.cf_name, key, value, ttl=-1)

logger.info(f"Inserted {batch_size * total_batches} records")

logger.info("Starting background compaction")
self.db.start_background_compaction(
self.cf_name,
interval_seconds=1,
min_sstables=2
)

logger.info("Monitoring compaction progress...")
start_time = time.time()
while time.time() - start_time < 30:
time.sleep(1)
analysis = self.analyze_sstables()
logger.info(f"SSTable count: {analysis['total_count']}")
if analysis['total_count'] <= 5:
logger.info("Compaction target achieved.")
break

self.assertTrue(analysis['total_count'] <= 5, "Compaction failed to reduce SSTables to the target.")
logger.info("Background compaction test completed successfully")

def analyze_sstables(self):
"""Analyze the SSTable files in the database."""
cf_path = Path(self.test_dir) / self.cf_name
sst_files = list(cf_path.glob("sstable_*.sst"))
analysis = {
'total_count': len(sst_files),
'size_distribution': {}
}

for sst in sst_files:
size = sst.stat().st_size
analysis['size_distribution'][size // (1024 * 1024)] = analysis['size_distribution'].get(size // (1024 * 1024), 0) + 1

return analysis

if __name__ == '__main__':
unittest.main()
27 changes: 27 additions & 0 deletions tidesdb/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,21 @@ class TidesDBMemtableDS:
else:
raise FileNotFoundError(f"Library '{library_name}' not found")

lib.tidesdb_compact_sstables.argtypes = [
POINTER(ctypes.c_void_p),
c_char_p,
c_int
]
lib.tidesdb_compact_sstables.restype = c_int

lib.tidesdb_start_background_partial_merge.argtypes = [
POINTER(ctypes.c_void_p),
c_char_p,
c_int,
c_int
]
lib.tidesdb_start_background_partial_merge.restype = c_int

class TidesDB:
"""TidesDB main database class."""

Expand Down Expand Up @@ -104,6 +119,18 @@ def delete(self, column_family_name, key):
result = lib.tidesdb_delete(self.tdb, c_name, c_key, c_size_t(len(key)))
if result != 0:
raise Exception("Failed to delete key-value pair")

def compact_sstables(self, column_family_name, max_threads):
c_name = create_string_buffer(column_family_name.encode('utf-8'))
result = lib.tidesdb_compact_sstables(self.tdb, c_name, c_int(max_threads))
if result != 0:
raise Exception("Failed to compact SSTables")

def start_background_compaction(self, column_family_name, interval_seconds, min_sstables):
c_name = create_string_buffer(column_family_name.encode('utf-8'))
result = lib.tidesdb_start_background_partial_merge(self.tdb, c_name, c_int(interval_seconds), c_int(min_sstables))
if result != 0:
raise Exception("Failed to start background compaction")

class Cursor:
"""Cursor class for iterating over column family key-value pairs."""
Expand Down