Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added src/benchmark/__init__.py
Empty file.
36 changes: 36 additions & 0 deletions src/benchmark/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import time

from benchmark.orm import start_mappers
from benchmark.utils import prepare_database, clean_up, check_time, get_skus
from benchmark.unit_of_work import IsolationUnitOfWork, WithForUpdateUnitOfWork

if __name__ == "__main__":

# first we will prepare new table
start_mappers()
isolation_uow = IsolationUnitOfWork()
with_update_uow = WithForUpdateUnitOfWork()

prepare_database()
skus = get_skus()
with_for_update_time, with_for_update_errors = check_time(skus, with_update_uow)

isolation_time, isolation_errors = check_time(skus, isolation_uow)

print("\n" * 10)
print(f"with_for_update_time : {with_for_update_time} skus_with_errors_count: {len(with_for_update_errors)}"
f" total_errors: {sum([x[0] for x in with_for_update_errors])}")
print(f"isolation_time : {isolation_time}; skus_with_errors_count: {len(isolation_errors)}"
f" total_errors: {sum([x[0] for x in isolation_errors])}")

# Lets see percentage of time getting for bench
print(f"Percent of time: {(isolation_time * 100) / with_for_update_time}")
clean_up()


"""

with_for_update_time : 4.8201072216033936 skus_with_errors_count: 9 total_errors: 0
isolation_time : 3.593703269958496; skus_with_errors_count: 9 total_errors: 801
Percent of time: 74.55650060753341
"""
24 changes: 24 additions & 0 deletions src/benchmark/orm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import logging

from allocation import model
from sqlalchemy import Table, MetaData, Column, Integer, String, Date

from sqlalchemy.orm import mapper

logger = logging.getLogger(__name__)

metadata = MetaData()

batches = Table(
'batches_benchmark', metadata,
Column('id', Integer, primary_key=True, autoincrement=True),
Column('reference', String(255)),
Column('sku', String(255)),
Column('_purchased_quantity', Integer, nullable=False),
Column('eta', Date, nullable=True),
)


def start_mappers():
logger.info("Starting mappers")
mapper(model.Batch, batches)
33 changes: 33 additions & 0 deletions src/benchmark/repository.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from allocation import model


class BatchRepository:

def __init__(self, session):
super().__init__()
self.session = session

def add(self, batch):
self.session.add(batch)

def get(self, ref):
return self.session.query(model.Batch).filter_by(reference=ref).first()

def get_by_sku(self, sku):
return self.session.query(model.Batch).filter_by(sku=sku).all()


class BatchWithForUpdateRepository:

def __init__(self, session):
super().__init__()
self.session = session

def add(self, batch):
self.session.add(batch)

def get(self, ref):
return self.session.query(model.Batch).filter_by(reference=ref).with_for_update().first()

def get_by_sku(self, sku):
return self.session.query(model.Batch).filter_by(sku=sku).with_for_update().all()
57 changes: 57 additions & 0 deletions src/benchmark/unit_of_work.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@

from allocation import config
from allocation.unit_of_work import AbstractUnitOfWork
from benchmark.repository import BatchRepository, BatchWithForUpdateRepository
from sqlalchemy import create_engine
from sqlalchemy.orm import Session, sessionmaker

WITH_FOR_UPDATE__SESSION_FACTORY = sessionmaker(bind=create_engine(
config.get_postgres_uri(),
))

SERIALIZABLE_ISOLATION_SESSION_FACTORY = sessionmaker(bind=create_engine(
config.get_postgres_uri(),
isolation_level="SERIALIZABLE",
))


class UnitOfWorkBenchmark(AbstractUnitOfWork):

def __init__(self, session_factory):
self.session_factory = session_factory

def __exit__(self, *args):
super().__exit__(*args)
self.session.close()

def _commit(self):
self.session.commit()

def rollback(self):
self.session.rollback()

def commit(self):
self._commit()


class IsolationUnitOfWork(UnitOfWorkBenchmark):

def __init__(self, session_factory=SERIALIZABLE_ISOLATION_SESSION_FACTORY):
self.session_factory = session_factory
super().__init__(session_factory)

def __enter__(self):
self.session = self.session_factory() # type: Session
self.batches = BatchRepository(self.session)
return super().__enter__()


class WithForUpdateUnitOfWork(UnitOfWorkBenchmark):
def __init__(self, session_factory=WITH_FOR_UPDATE__SESSION_FACTORY):
self.session_factory = session_factory
super().__init__(session_factory)

def __enter__(self):
self.session = self.session_factory() # type: Session
self.batches = BatchWithForUpdateRepository(self.session)
return super().__enter__()
135 changes: 135 additions & 0 deletions src/benchmark/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import datetime
import multiprocessing as mp
import random
import time
import uuid

import psycopg2
from pprint import pprint


def get_conn_and_cursor():
conn = psycopg2.connect(
dbname="allocation",
user="allocation",
host="localhost",
port="54321",
password="abc123"
)
cur = conn.cursor()
return conn, cur


def get_random_batch_row(skus):
ref = str(uuid.uuid4())
sku = random.choice(skus)
eta = datetime.datetime.now() + datetime.timedelta(days=random.randint(0, 1000))
eta = eta.utcnow()
_purchase_quantity = random.randint(1, 1000)
return ref, sku, _purchase_quantity, eta


def fill_database_with_fake_batches(con, cur, number_of_batches=10000):
batch_insert_query = "INSERT into batches_benchmark (reference, sku, _purchased_quantity, eta) VALUES ('%s', '%s', '%s', '%s')"
skus = ["bench-" + str(uuid.uuid4()) for _ in range(100)]
for _ in range(number_of_batches):
batch = get_random_batch_row(skus)
cur.execute(batch_insert_query % batch)

con.commit()


def drop_data_from_db(con, cur):
drop_bench_batches_query = "DELETE from batches_benchmark where sku like 'bench-%'"
cur.execute(drop_bench_batches_query)


def prepare_database():
con, cur = get_conn_and_cursor()
create_table(con, cur)
fill_database_with_fake_batches(con, cur)
con.close()


def clean_up():
con, cur = get_conn_and_cursor()
drop_data_from_db(con, cur)
drop_table(con, cur)


def check_time(skus, uow):
start_time = time.time()
manager = mp.Manager()
errors_count = manager.list()

processes = [mp.Process(target=update_batches, args=(skus, uow, errors_count)) for _ in range(1, 10)]
for p in processes:
p.start()

for p in processes:
p.join()

end_time = time.time() - start_time
return end_time, errors_count


def get_benchmark_refs():
con, cur = get_conn_and_cursor()
cur.execute("SELECT reference from batches_benchmark where sku like 'bench-%'")
references = cur.fetchall()
con.close()
return references


def get_skus():
con, cur = get_conn_and_cursor()
cur.execute("SELECT sku from batches_benchmark where sku like 'bench-%'")
skus = cur.fetchall()
con.close()
return skus


def update_batches(skus, uow, errors_count):
# this will be all number of errors for 1 process
errors = 0
for _ in range(100):
with uow:
# we want to count only those errors that were not fixed by retries
sku_error = 0
sku = random.choice(skus)
batches = uow.batches.get_by_sku(sku)
wait_time = 1
for batch in batches:
batch._purchased_quantity = batch._purchased_quantity + 1
uow.session.add(batch)
for _ in range(10):
try:
uow.commit()
except Exception as e:
sku_error += 1
time.sleep(wait_time)
else:
# retry fix commit problem so decrease errors cound for this iteration
sku_error = 0
break

errors += sku_error
if errors:
errors_count.append((errors, len(batches)))


def create_table(con, cur):
cur.execute("""
CREATE TABLE IF NOT EXISTS batches_benchmark (
id SERIAL primary key,
reference VARCHAR (255) unique,
sku VARCHAR (255),
_purchased_quantity int,
eta date);
""")
con.commit()


def drop_table(con, cur):
cur.execute("DROP TABLE IF EXISTS batches_benchmark")
con.commit()