Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reload connection pool when connection with Postgres is lost #5

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 26 additions & 14 deletions addok_psql_store/__init__.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import os
from pgcopy import CopyManager

from psycopg2 import pool
from psycopg2 import pool, OperationalError, InterfaceError
from psycopg2.extras import execute_values

from addok.config import config


class PSQLStore:
def __init__(self, *args, **kwargs):
self.pool = pool.SimpleConnectionPool(minconn=8, maxconn=64,
self.pool = pool.SimpleConnectionPool(minconn=1, maxconn=2,
dsn=config.PG_CONFIG)
create_table_query = '''
CREATE TABLE IF NOT EXISTS
Expand All @@ -25,7 +26,13 @@ def __init__(self, *args, **kwargs):
def getconn(self):
# Use pid as connection id so we can reuse the connection within the
# same process.
return self.pool.getconn(key=os.getpid())
conn = self.pool.getconn(key=os.getpid())
try:
c = conn.cursor()
return conn
except (OperationalError, InterfaceError) as err:
self.pool.putconn(conn, key=os.getpid())
return self.getconn()

def fetch(self, *keys):
# Using ANY results in valid SQL if `keys` is empty.
Expand All @@ -39,19 +46,24 @@ def fetch(self, *keys):

def upsert(self, *docs):
"""
Potential performance boost, using copy_from:
* https://gist.github.com/jsheedy/efa9a69926a754bebf0e9078fd085df6
* https://gist.github.com/jsheedy/ed81cdf18190183b3b7d

Or event copy_expert for mixed binary content:
* http://stackoverflow.com/a/8150329
Use copy_from to load the binary data into db, in case of conflicts,
we switch to execute_values, with "ON CONFLICT DO NOTHING" only the
failing row will be ignored instead of the whole chunk (docs)
:param docs:
:return:
"""
insert_into_query = '''
INSERT INTO {PG_TABLE} (key, data) VALUES %s
ON CONFLICT DO NOTHING
'''.format(**config)
with self.getconn() as conn, conn.cursor() as curs:
execute_values(curs, insert_into_query, docs)
mgr = CopyManager(conn, '{PG_TABLE}'.format(**config), ['key', 'data'])
try:
mgr.copy(docs) # will raise error if key exists
except:
insert_into_query = '''
INSERT INTO {PG_TABLE} (key, data) VALUES %s
ON CONFLICT DO NOTHING
'''.format(**config)
execute_values(curs, insert_into_query, docs)
else:
conn.commit()

def remove(self, *keys):
delete_from_query = '''
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
psycopg2==2.7.1
pgcopy