diff --git a/addok_psql_store/__init__.py b/addok_psql_store/__init__.py index b9e6af7..3994086 100644 --- a/addok_psql_store/__init__.py +++ b/addok_psql_store/__init__.py @@ -1,6 +1,7 @@ import os +from pgcopy import CopyManager -from psycopg2 import pool +from psycopg2 import pool, OperationalError, InterfaceError from psycopg2.extras import execute_values from addok.config import config @@ -8,7 +9,7 @@ class PSQLStore: def __init__(self, *args, **kwargs): - self.pool = pool.SimpleConnectionPool(minconn=8, maxconn=64, + self.pool = pool.SimpleConnectionPool(minconn=1, maxconn=2, dsn=config.PG_CONFIG) create_table_query = ''' CREATE TABLE IF NOT EXISTS @@ -25,7 +26,13 @@ def __init__(self, *args, **kwargs): def getconn(self): # Use pid as connection id so we can reuse the connection within the # same process. - return self.pool.getconn(key=os.getpid()) + conn = self.pool.getconn(key=os.getpid()) + try: + c = conn.cursor() + return conn + except (OperationalError, InterfaceError) as err: + self.pool.putconn(conn, key=os.getpid()) + return self.getconn() def fetch(self, *keys): # Using ANY results in valid SQL if `keys` is empty. @@ -39,19 +46,24 @@ def fetch(self, *keys): def upsert(self, *docs): """ - Potential performance boost, using copy_from: - * https://gist.github.com/jsheedy/efa9a69926a754bebf0e9078fd085df6 - * https://gist.github.com/jsheedy/ed81cdf18190183b3b7d - - Or event copy_expert for mixed binary content: - * http://stackoverflow.com/a/8150329 + Use copy_from to load the binary data into db, in case of conflicts, + we switch to execute_values, with "ON CONFLICT DO NOTHING" only the + failing row will be ignored instead of the whole chunk (docs) + :param docs: + :return: """ - insert_into_query = ''' - INSERT INTO {PG_TABLE} (key, data) VALUES %s - ON CONFLICT DO NOTHING - '''.format(**config) with self.getconn() as conn, conn.cursor() as curs: - execute_values(curs, insert_into_query, docs) + mgr = CopyManager(conn, '{PG_TABLE}'.format(**config), ['key', 'data']) + try: + mgr.copy(docs) # will raise error if key exists + except: + insert_into_query = ''' + INSERT INTO {PG_TABLE} (key, data) VALUES %s + ON CONFLICT DO NOTHING + '''.format(**config) + execute_values(curs, insert_into_query, docs) + else: + conn.commit() def remove(self, *keys): delete_from_query = ''' diff --git a/requirements.txt b/requirements.txt index 3b4eadd..5f0d796 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,2 @@ psycopg2==2.7.1 +pgcopy