From d6e88bb27c5b2d2b626737293096482dfe716a78 Mon Sep 17 00:00:00 2001 From: dvasilov Date: Mon, 23 Dec 2019 12:28:45 +0200 Subject: [PATCH 1/2] Bench MVP --- src/benchmark/__init__.py | 0 src/benchmark/main.py | 35 +++++++++++ src/benchmark/orm.py | 24 ++++++++ src/benchmark/repository.py | 27 +++++++++ src/benchmark/unit_of_work.py | 57 ++++++++++++++++++ src/benchmark/utils.py | 109 ++++++++++++++++++++++++++++++++++ 6 files changed, 252 insertions(+) create mode 100644 src/benchmark/__init__.py create mode 100644 src/benchmark/main.py create mode 100644 src/benchmark/orm.py create mode 100644 src/benchmark/repository.py create mode 100644 src/benchmark/unit_of_work.py create mode 100644 src/benchmark/utils.py diff --git a/src/benchmark/__init__.py b/src/benchmark/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/benchmark/main.py b/src/benchmark/main.py new file mode 100644 index 00000000..1429d95b --- /dev/null +++ b/src/benchmark/main.py @@ -0,0 +1,35 @@ +import time + +from benchmark.orm import start_mappers +from benchmark.utils import prepare_database, clean_up, check_time +from benchmark.unit_of_work import IsolationUnitOfWork, WithForUpdateUnitOfWork + +if __name__ == "__main__": + + # first we will prepare new table + start_mappers() + isolation_uow = IsolationUnitOfWork() + with_update_uow = WithForUpdateUnitOfWork() + + prepare_database() + + with_for_update_time = check_time(with_update_uow) + print("with_for_update_time ", with_for_update_time) + + isolation_time = check_time(isolation_uow) + print("isolation_time ", isolation_time) + + clean_up() + + +""" +with_for_update_time 210.55391550064087 +isolation_time 202.72578072547913 + +with_for_update_time 200.93747401237488 +isolation_time 218.2748246192932 + +with_for_update_time 213.71013808250427 +isolation_time 206.7240309715271 + +""" \ No newline at end of file diff --git a/src/benchmark/orm.py b/src/benchmark/orm.py new file mode 100644 index 00000000..e28eac80 --- /dev/null +++ b/src/benchmark/orm.py @@ -0,0 +1,24 @@ +import logging + +from allocation import model +from sqlalchemy import Table, MetaData, Column, Integer, String, Date + +from sqlalchemy.orm import mapper + +logger = logging.getLogger(__name__) + +metadata = MetaData() + +batches = Table( + 'batches_benchmark', metadata, + Column('id', Integer, primary_key=True, autoincrement=True), + Column('reference', String(255)), + Column('sku', String(255)), + Column('_purchased_quantity', Integer, nullable=False), + Column('eta', Date, nullable=True), +) + + +def start_mappers(): + logger.info("Starting mappers") + mapper(model.Batch, batches) \ No newline at end of file diff --git a/src/benchmark/repository.py b/src/benchmark/repository.py new file mode 100644 index 00000000..506b0644 --- /dev/null +++ b/src/benchmark/repository.py @@ -0,0 +1,27 @@ +from allocation import model + + +class BatchRepository: + + def __init__(self, session): + super().__init__() + self.session = session + + def add(self, batch): + self.session.add(batch) + + def get(self, ref): + return self.session.query(model.Batch).filter_by(reference=ref).first() + + +class BatchWithForUpdateRepository: + + def __init__(self, session): + super().__init__() + self.session = session + + def add(self, batch): + self.session.add(batch) + + def get(self, ref): + return self.session.query(model.Batch).filter_by(reference=ref).with_for_update().first() diff --git a/src/benchmark/unit_of_work.py b/src/benchmark/unit_of_work.py new file mode 100644 index 00000000..c0130d79 --- /dev/null +++ b/src/benchmark/unit_of_work.py @@ -0,0 +1,57 @@ + +from allocation import config +from allocation.unit_of_work import AbstractUnitOfWork +from benchmark.repository import BatchRepository, BatchWithForUpdateRepository +from sqlalchemy import create_engine +from sqlalchemy.orm import Session, sessionmaker + +WITH_FOR_UPDATE__SESSION_FACTORY = sessionmaker(bind=create_engine( + config.get_postgres_uri(), +)) + +SERIALIZABLE_ISOLATION_SESSION_FACTORY = sessionmaker(bind=create_engine( + config.get_postgres_uri(), + isolation_level="SERIALIZABLE", +)) + + +class UnitOfWorkBenchmark(AbstractUnitOfWork): + + def __init__(self, session_factory): + self.session_factory = session_factory + + def __exit__(self, *args): + super().__exit__(*args) + self.session.close() + + def _commit(self): + self.session.commit() + + def rollback(self): + self.session.rollback() + + def commit(self): + self._commit() + + +class IsolationUnitOfWork(UnitOfWorkBenchmark): + + def __init__(self, session_factory=SERIALIZABLE_ISOLATION_SESSION_FACTORY): + self.session_factory = session_factory + super().__init__(session_factory) + + def __enter__(self): + self.session = self.session_factory() # type: Session + self.batches = BatchRepository(self.session) + return super().__enter__() + + +class WithForUpdateUnitOfWork(UnitOfWorkBenchmark): + def __init__(self, session_factory=WITH_FOR_UPDATE__SESSION_FACTORY): + self.session_factory = session_factory + super().__init__(session_factory) + + def __enter__(self): + self.session = self.session_factory() # type: Session + self.batches = BatchWithForUpdateRepository(self.session) + return super().__enter__() diff --git a/src/benchmark/utils.py b/src/benchmark/utils.py new file mode 100644 index 00000000..eee5df00 --- /dev/null +++ b/src/benchmark/utils.py @@ -0,0 +1,109 @@ +import datetime +import random +import time +import uuid + +import psycopg2 +import multiprocessing as mp + + +def get_conn_and_cursor(): + conn = psycopg2.connect( + dbname="allocation", + user="allocation", + host="localhost", + port="54321", + password="abc123" + ) + cur = conn.cursor() + return conn, cur + + +def get_random_batch_row(): + ref = str(uuid.uuid4()) + sku = 'bench-' + str(uuid.uuid4()) + eta = datetime.datetime.now() + datetime.timedelta(days=random.randint(0, 1000)) + eta = eta.utcnow() + _purchase_quantity = random.randint(1, 1000) + return ref, sku, _purchase_quantity, eta + + +def fill_database_with_fake_batches(con, cur, number_of_batches=1000): + batch_insert_query = "INSERT into batches_benchmark (reference, sku, _purchased_quantity, eta) VALUES ('%s', '%s', '%s', '%s')" + + for _ in range(number_of_batches): + batch = get_random_batch_row() + cur.execute(batch_insert_query % batch) + + con.commit() + + +def drop_data_from_db(con, cur): + drop_bench_batches_query = "DELETE from batches_benchmark where sku like 'bench-%'" + cur.execute(drop_bench_batches_query) + + +def prepare_database(): + con, cur = get_conn_and_cursor() + create_table(con, cur) + fill_database_with_fake_batches(con, cur) + con.close() + + +def clean_up(): + con, cur = get_conn_and_cursor() + drop_data_from_db(con, cur) + drop_table(con, cur) + + +def check_time(uow): + start_time = time.time() + processes = [mp.Process(target=update_batches(uow)) for x in range(1, 5)] + for p in processes: + p.start() + + for p in processes: + p.join() + + end_time = time.time() - start_time + return end_time + + +def update_batches(uow): + refs = get_benchmark_refs() + for _ in range(10000): + ref = random.choice(refs) + update_batch(ref, uow) + + +def get_benchmark_refs(): + con, cur = get_conn_and_cursor() + cur.execute("SELECT reference from batches_benchmark where sku like 'bench-%'") + references = cur.fetchall() + con.close() + return references + + +def update_batch(ref, uow): + with uow: + batch = uow.batches.get(ref) + batch._purchased_quantity = batch._purchased_quantity + 1 + uow.session.add(batch) + uow.commit() + + +def create_table(con, cur): + cur.execute(""" + CREATE TABLE IF NOT EXISTS batches_benchmark ( + id SERIAL primary key, + reference VARCHAR (255) unique, + sku VARCHAR (255), + _purchased_quantity int, + eta date); + """) + con.commit() + + +def drop_table(con, cur): + cur.execute("DROP TABLE IF EXISTS batches_benchmark") + con.commit() \ No newline at end of file From 4605989d7b5cee69bf030fdba772732decaf8f0c Mon Sep 17 00:00:00 2001 From: dvasilov Date: Tue, 24 Dec 2019 14:00:52 +0200 Subject: [PATCH 2/2] Add retry for isolation level --- src/benchmark/main.py | 27 +++++++------- src/benchmark/repository.py | 6 ++++ src/benchmark/utils.py | 72 +++++++++++++++++++++++++------------ 3 files changed, 69 insertions(+), 36 deletions(-) diff --git a/src/benchmark/main.py b/src/benchmark/main.py index 1429d95b..0db590ea 100644 --- a/src/benchmark/main.py +++ b/src/benchmark/main.py @@ -1,7 +1,7 @@ import time from benchmark.orm import start_mappers -from benchmark.utils import prepare_database, clean_up, check_time +from benchmark.utils import prepare_database, clean_up, check_time, get_skus from benchmark.unit_of_work import IsolationUnitOfWork, WithForUpdateUnitOfWork if __name__ == "__main__": @@ -12,24 +12,25 @@ with_update_uow = WithForUpdateUnitOfWork() prepare_database() + skus = get_skus() + with_for_update_time, with_for_update_errors = check_time(skus, with_update_uow) - with_for_update_time = check_time(with_update_uow) - print("with_for_update_time ", with_for_update_time) + isolation_time, isolation_errors = check_time(skus, isolation_uow) - isolation_time = check_time(isolation_uow) - print("isolation_time ", isolation_time) + print("\n" * 10) + print(f"with_for_update_time : {with_for_update_time} skus_with_errors_count: {len(with_for_update_errors)}" + f" total_errors: {sum([x[0] for x in with_for_update_errors])}") + print(f"isolation_time : {isolation_time}; skus_with_errors_count: {len(isolation_errors)}" + f" total_errors: {sum([x[0] for x in isolation_errors])}") + # Lets see percentage of time getting for bench + print(f"Percent of time: {(isolation_time * 100) / with_for_update_time}") clean_up() """ -with_for_update_time 210.55391550064087 -isolation_time 202.72578072547913 - -with_for_update_time 200.93747401237488 -isolation_time 218.2748246192932 - -with_for_update_time 213.71013808250427 -isolation_time 206.7240309715271 +with_for_update_time : 4.8201072216033936 skus_with_errors_count: 9 total_errors: 0 +isolation_time : 3.593703269958496; skus_with_errors_count: 9 total_errors: 801 +Percent of time: 74.55650060753341 """ \ No newline at end of file diff --git a/src/benchmark/repository.py b/src/benchmark/repository.py index 506b0644..7f277e50 100644 --- a/src/benchmark/repository.py +++ b/src/benchmark/repository.py @@ -13,6 +13,9 @@ def add(self, batch): def get(self, ref): return self.session.query(model.Batch).filter_by(reference=ref).first() + def get_by_sku(self, sku): + return self.session.query(model.Batch).filter_by(sku=sku).all() + class BatchWithForUpdateRepository: @@ -25,3 +28,6 @@ def add(self, batch): def get(self, ref): return self.session.query(model.Batch).filter_by(reference=ref).with_for_update().first() + + def get_by_sku(self, sku): + return self.session.query(model.Batch).filter_by(sku=sku).with_for_update().all() diff --git a/src/benchmark/utils.py b/src/benchmark/utils.py index eee5df00..dc645477 100644 --- a/src/benchmark/utils.py +++ b/src/benchmark/utils.py @@ -1,10 +1,11 @@ import datetime +import multiprocessing as mp import random import time import uuid import psycopg2 -import multiprocessing as mp +from pprint import pprint def get_conn_and_cursor(): @@ -19,20 +20,20 @@ def get_conn_and_cursor(): return conn, cur -def get_random_batch_row(): +def get_random_batch_row(skus): ref = str(uuid.uuid4()) - sku = 'bench-' + str(uuid.uuid4()) + sku = random.choice(skus) eta = datetime.datetime.now() + datetime.timedelta(days=random.randint(0, 1000)) eta = eta.utcnow() _purchase_quantity = random.randint(1, 1000) return ref, sku, _purchase_quantity, eta -def fill_database_with_fake_batches(con, cur, number_of_batches=1000): +def fill_database_with_fake_batches(con, cur, number_of_batches=10000): batch_insert_query = "INSERT into batches_benchmark (reference, sku, _purchased_quantity, eta) VALUES ('%s', '%s', '%s', '%s')" - + skus = ["bench-" + str(uuid.uuid4()) for _ in range(100)] for _ in range(number_of_batches): - batch = get_random_batch_row() + batch = get_random_batch_row(skus) cur.execute(batch_insert_query % batch) con.commit() @@ -56,9 +57,12 @@ def clean_up(): drop_table(con, cur) -def check_time(uow): +def check_time(skus, uow): start_time = time.time() - processes = [mp.Process(target=update_batches(uow)) for x in range(1, 5)] + manager = mp.Manager() + errors_count = manager.list() + + processes = [mp.Process(target=update_batches, args=(skus, uow, errors_count)) for _ in range(1, 10)] for p in processes: p.start() @@ -66,14 +70,7 @@ def check_time(uow): p.join() end_time = time.time() - start_time - return end_time - - -def update_batches(uow): - refs = get_benchmark_refs() - for _ in range(10000): - ref = random.choice(refs) - update_batch(ref, uow) + return end_time, errors_count def get_benchmark_refs(): @@ -84,12 +81,41 @@ def get_benchmark_refs(): return references -def update_batch(ref, uow): - with uow: - batch = uow.batches.get(ref) - batch._purchased_quantity = batch._purchased_quantity + 1 - uow.session.add(batch) - uow.commit() +def get_skus(): + con, cur = get_conn_and_cursor() + cur.execute("SELECT sku from batches_benchmark where sku like 'bench-%'") + skus = cur.fetchall() + con.close() + return skus + + +def update_batches(skus, uow, errors_count): + # this will be all number of errors for 1 process + errors = 0 + for _ in range(100): + with uow: + # we want to count only those errors that were not fixed by retries + sku_error = 0 + sku = random.choice(skus) + batches = uow.batches.get_by_sku(sku) + wait_time = 1 + for batch in batches: + batch._purchased_quantity = batch._purchased_quantity + 1 + uow.session.add(batch) + for _ in range(10): + try: + uow.commit() + except Exception as e: + sku_error += 1 + time.sleep(wait_time) + else: + # retry fix commit problem so decrease errors cound for this iteration + sku_error = 0 + break + + errors += sku_error + if errors: + errors_count.append((errors, len(batches))) def create_table(con, cur): @@ -106,4 +132,4 @@ def create_table(con, cur): def drop_table(con, cur): cur.execute("DROP TABLE IF EXISTS batches_benchmark") - con.commit() \ No newline at end of file + con.commit()