diff --git a/src/benchmark/__init__.py b/src/benchmark/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/benchmark/main.py b/src/benchmark/main.py new file mode 100644 index 00000000..0db590ea --- /dev/null +++ b/src/benchmark/main.py @@ -0,0 +1,36 @@ +import time + +from benchmark.orm import start_mappers +from benchmark.utils import prepare_database, clean_up, check_time, get_skus +from benchmark.unit_of_work import IsolationUnitOfWork, WithForUpdateUnitOfWork + +if __name__ == "__main__": + + # first we will prepare new table + start_mappers() + isolation_uow = IsolationUnitOfWork() + with_update_uow = WithForUpdateUnitOfWork() + + prepare_database() + skus = get_skus() + with_for_update_time, with_for_update_errors = check_time(skus, with_update_uow) + + isolation_time, isolation_errors = check_time(skus, isolation_uow) + + print("\n" * 10) + print(f"with_for_update_time : {with_for_update_time} skus_with_errors_count: {len(with_for_update_errors)}" + f" total_errors: {sum([x[0] for x in with_for_update_errors])}") + print(f"isolation_time : {isolation_time}; skus_with_errors_count: {len(isolation_errors)}" + f" total_errors: {sum([x[0] for x in isolation_errors])}") + + # Lets see percentage of time getting for bench + print(f"Percent of time: {(isolation_time * 100) / with_for_update_time}") + clean_up() + + +""" + +with_for_update_time : 4.8201072216033936 skus_with_errors_count: 9 total_errors: 0 +isolation_time : 3.593703269958496; skus_with_errors_count: 9 total_errors: 801 +Percent of time: 74.55650060753341 +""" \ No newline at end of file diff --git a/src/benchmark/orm.py b/src/benchmark/orm.py new file mode 100644 index 00000000..e28eac80 --- /dev/null +++ b/src/benchmark/orm.py @@ -0,0 +1,24 @@ +import logging + +from allocation import model +from sqlalchemy import Table, MetaData, Column, Integer, String, Date + +from sqlalchemy.orm import mapper + +logger = logging.getLogger(__name__) + +metadata = MetaData() + +batches = Table( + 'batches_benchmark', metadata, + Column('id', Integer, primary_key=True, autoincrement=True), + Column('reference', String(255)), + Column('sku', String(255)), + Column('_purchased_quantity', Integer, nullable=False), + Column('eta', Date, nullable=True), +) + + +def start_mappers(): + logger.info("Starting mappers") + mapper(model.Batch, batches) \ No newline at end of file diff --git a/src/benchmark/repository.py b/src/benchmark/repository.py new file mode 100644 index 00000000..7f277e50 --- /dev/null +++ b/src/benchmark/repository.py @@ -0,0 +1,33 @@ +from allocation import model + + +class BatchRepository: + + def __init__(self, session): + super().__init__() + self.session = session + + def add(self, batch): + self.session.add(batch) + + def get(self, ref): + return self.session.query(model.Batch).filter_by(reference=ref).first() + + def get_by_sku(self, sku): + return self.session.query(model.Batch).filter_by(sku=sku).all() + + +class BatchWithForUpdateRepository: + + def __init__(self, session): + super().__init__() + self.session = session + + def add(self, batch): + self.session.add(batch) + + def get(self, ref): + return self.session.query(model.Batch).filter_by(reference=ref).with_for_update().first() + + def get_by_sku(self, sku): + return self.session.query(model.Batch).filter_by(sku=sku).with_for_update().all() diff --git a/src/benchmark/unit_of_work.py b/src/benchmark/unit_of_work.py new file mode 100644 index 00000000..c0130d79 --- /dev/null +++ b/src/benchmark/unit_of_work.py @@ -0,0 +1,57 @@ + +from allocation import config +from allocation.unit_of_work import AbstractUnitOfWork +from benchmark.repository import BatchRepository, BatchWithForUpdateRepository +from sqlalchemy import create_engine +from sqlalchemy.orm import Session, sessionmaker + +WITH_FOR_UPDATE__SESSION_FACTORY = sessionmaker(bind=create_engine( + config.get_postgres_uri(), +)) + +SERIALIZABLE_ISOLATION_SESSION_FACTORY = sessionmaker(bind=create_engine( + config.get_postgres_uri(), + isolation_level="SERIALIZABLE", +)) + + +class UnitOfWorkBenchmark(AbstractUnitOfWork): + + def __init__(self, session_factory): + self.session_factory = session_factory + + def __exit__(self, *args): + super().__exit__(*args) + self.session.close() + + def _commit(self): + self.session.commit() + + def rollback(self): + self.session.rollback() + + def commit(self): + self._commit() + + +class IsolationUnitOfWork(UnitOfWorkBenchmark): + + def __init__(self, session_factory=SERIALIZABLE_ISOLATION_SESSION_FACTORY): + self.session_factory = session_factory + super().__init__(session_factory) + + def __enter__(self): + self.session = self.session_factory() # type: Session + self.batches = BatchRepository(self.session) + return super().__enter__() + + +class WithForUpdateUnitOfWork(UnitOfWorkBenchmark): + def __init__(self, session_factory=WITH_FOR_UPDATE__SESSION_FACTORY): + self.session_factory = session_factory + super().__init__(session_factory) + + def __enter__(self): + self.session = self.session_factory() # type: Session + self.batches = BatchWithForUpdateRepository(self.session) + return super().__enter__() diff --git a/src/benchmark/utils.py b/src/benchmark/utils.py new file mode 100644 index 00000000..dc645477 --- /dev/null +++ b/src/benchmark/utils.py @@ -0,0 +1,135 @@ +import datetime +import multiprocessing as mp +import random +import time +import uuid + +import psycopg2 +from pprint import pprint + + +def get_conn_and_cursor(): + conn = psycopg2.connect( + dbname="allocation", + user="allocation", + host="localhost", + port="54321", + password="abc123" + ) + cur = conn.cursor() + return conn, cur + + +def get_random_batch_row(skus): + ref = str(uuid.uuid4()) + sku = random.choice(skus) + eta = datetime.datetime.now() + datetime.timedelta(days=random.randint(0, 1000)) + eta = eta.utcnow() + _purchase_quantity = random.randint(1, 1000) + return ref, sku, _purchase_quantity, eta + + +def fill_database_with_fake_batches(con, cur, number_of_batches=10000): + batch_insert_query = "INSERT into batches_benchmark (reference, sku, _purchased_quantity, eta) VALUES ('%s', '%s', '%s', '%s')" + skus = ["bench-" + str(uuid.uuid4()) for _ in range(100)] + for _ in range(number_of_batches): + batch = get_random_batch_row(skus) + cur.execute(batch_insert_query % batch) + + con.commit() + + +def drop_data_from_db(con, cur): + drop_bench_batches_query = "DELETE from batches_benchmark where sku like 'bench-%'" + cur.execute(drop_bench_batches_query) + + +def prepare_database(): + con, cur = get_conn_and_cursor() + create_table(con, cur) + fill_database_with_fake_batches(con, cur) + con.close() + + +def clean_up(): + con, cur = get_conn_and_cursor() + drop_data_from_db(con, cur) + drop_table(con, cur) + + +def check_time(skus, uow): + start_time = time.time() + manager = mp.Manager() + errors_count = manager.list() + + processes = [mp.Process(target=update_batches, args=(skus, uow, errors_count)) for _ in range(1, 10)] + for p in processes: + p.start() + + for p in processes: + p.join() + + end_time = time.time() - start_time + return end_time, errors_count + + +def get_benchmark_refs(): + con, cur = get_conn_and_cursor() + cur.execute("SELECT reference from batches_benchmark where sku like 'bench-%'") + references = cur.fetchall() + con.close() + return references + + +def get_skus(): + con, cur = get_conn_and_cursor() + cur.execute("SELECT sku from batches_benchmark where sku like 'bench-%'") + skus = cur.fetchall() + con.close() + return skus + + +def update_batches(skus, uow, errors_count): + # this will be all number of errors for 1 process + errors = 0 + for _ in range(100): + with uow: + # we want to count only those errors that were not fixed by retries + sku_error = 0 + sku = random.choice(skus) + batches = uow.batches.get_by_sku(sku) + wait_time = 1 + for batch in batches: + batch._purchased_quantity = batch._purchased_quantity + 1 + uow.session.add(batch) + for _ in range(10): + try: + uow.commit() + except Exception as e: + sku_error += 1 + time.sleep(wait_time) + else: + # retry fix commit problem so decrease errors cound for this iteration + sku_error = 0 + break + + errors += sku_error + if errors: + errors_count.append((errors, len(batches))) + + +def create_table(con, cur): + cur.execute(""" + CREATE TABLE IF NOT EXISTS batches_benchmark ( + id SERIAL primary key, + reference VARCHAR (255) unique, + sku VARCHAR (255), + _purchased_quantity int, + eta date); + """) + con.commit() + + +def drop_table(con, cur): + cur.execute("DROP TABLE IF EXISTS batches_benchmark") + con.commit()