Skip to content

Commit 4a2516d

Browse files
authored
Merge pull request arngarden#23 from quantopian/master
Upstreaming multiple changes Quantopian has been using internally
2 parents df418c0 + a8539f7 commit 4a2516d

File tree

8 files changed

+90
-100
lines changed

8 files changed

+90
-100
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ var/
1919
*.egg-info/
2020
.installed.cfg
2121
*.egg
22+
.eggs/
2223

2324
# Installer logs
2425
pip-log.txt

MANIFEST.in

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
include *.txt *.md
1+
include *.txt *.md *.rst

README.md

Lines changed: 0 additions & 31 deletions
This file was deleted.

README.rst

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
MongoDBProxy
2+
============
3+
4+
MongoDBProxy is used to create a proxy around a MongoDB-connection in order to
5+
automatically handle AutoReconnect-exceptions. You use MongoDBProxy in the
6+
same way you would an ordinary MongoDB-connection but don't need to worry about
7+
handling AutoReconnects by yourself.
8+
9+
Usage::
10+
11+
>>> import pymongo
12+
>>> import mongo_proxy
13+
>>> safe_conn = mongo_proxy.MongoProxy(pymongo.MongoReplicaSetClient(replicaSet='blog_rs')
14+
>>> safe_conn.blogs.posts.insert(post) # Automatically handles AutoReconnect.
15+
16+
**See here for more details:**
17+
`<http://www.arngarden.com/2013/04/29/handling-mongodb-autoreconnect-exceptions-in-python-using-a-proxy/>`_
18+
19+
**Contributors**:
20+
21+
- Jonathan Kamens (`<https://github.com/jikamens>`_)
22+
- Michael Cetrulo (`<https://github.com/git2samus>`_)
23+
- Richard Frank (`<https://github.com/richafrank>`_)
24+
- David Lindquist (`<https://github.com/dlindquist>`_)

mongo_proxy/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,8 @@
11
from .mongodb_proxy import MongoProxy
22
from .durable_cursor import DurableCursor, MongoReconnectFailure
3+
4+
__all__ = [
5+
'MongoProxy',
6+
'DurableCursor',
7+
'MongoReconnectFailure',
8+
]

mongo_proxy/durable_cursor.py

Lines changed: 50 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,26 @@
1515
"""
1616

1717
import logging
18-
import sys
1918
import time
20-
from pymongo.errors import AutoReconnect, OperationFailure
19+
from pymongo.errors import (
20+
AutoReconnect,
21+
CursorNotFound,
22+
ExecutionTimeout,
23+
OperationFailure,
24+
WTimeoutError,
25+
)
2126

2227
# How long we are willing to attempt to reconnect when the replicaset
2328
# fails over. We double the delay between each attempt.
2429
MAX_RECONNECT_TIME = 60
2530
MAX_SLEEP = 5
2631
RECONNECT_INITIAL_DELAY = 1
32+
RETRYABLE_OPERATION_FAILURE_CLASSES = (
33+
AutoReconnect, # AutoReconnect is raised when the primary node fails
34+
CursorNotFound,
35+
ExecutionTimeout,
36+
WTimeoutError,
37+
)
2738

2839

2940
class MongoReconnectFailure(Exception):
@@ -136,46 +147,41 @@ def reload_cursor(self):
136147
def alive(self):
137148
return self.tailable and self.cursor.alive
138149

139-
def next(self):
140-
try:
141-
next_record = self.cursor.next()
142-
143-
# OperationFailure is raised when an operation fails inside
144-
# the remote DB. This most commonly occurs when our cursor
145-
# has been inactive for 10 minutes or more.
146-
except OperationFailure as exc:
147-
self.logger.info("""
148-
Attempting to handle cursor timeout.
149-
OperationFailure exception catches a lot of failure cases.
150-
The current exception is actually:
151-
{exc}
152-
TODO: Inspect the exc name and only reload the cursor on timeouts.
150+
def __next__(self):
151+
next_record = self._with_retry(get_next=True, f=lambda: next(self.cursor))
152+
# Increment count before returning so we know how many records
153+
# to skip if a failure occurs later.
154+
self.counter += 1
155+
return next_record
153156

154-
The query spec that timed out was:
155-
{spec}
156-
""".strip().format(exc=exc, spec=self.spec))
157+
next = __next__
157158

159+
def _with_retry(self, get_next, f, *args, **kwargs):
160+
try:
161+
next_record = f(*args, **kwargs)
162+
except RETRYABLE_OPERATION_FAILURE_CLASSES as exc:
163+
self.logger.info(
164+
"Got {!r}; attempting recovery. The query spec was: {}"
165+
.format(exc, self.spec)
166+
)
158167
# Try to reload the cursor and continue where we left off
159-
self.reload_cursor()
160-
next_record = self.cursor.next()
161-
self.logger.info("Cursor reload after timeout successful.")
162-
163-
# AutoReconnect is raised when the primary node fails and we
164-
# attempt to reconnect to the replica set.
165-
except AutoReconnect:
166-
self.logger.info("Got AutoReconnect; attempting recovery",
167-
exc_info=sys.exc_info())
168-
169-
# Try for up to self.max_reconnect_time to reconnect to
170-
# the replicaset before giving up. If the reconnect is
171-
# successful, we return success == True along with the
172-
# next record to return. Otherwise we return (False,
173-
# None).
174-
next_record = self.try_reconnect()
168+
next_record = self.try_reconnect(get_next=get_next)
169+
self.logger.info("Cursor reload after {!r} successful."
170+
.format(exc))
171+
172+
except OperationFailure as exc:
173+
# No special subclass for this:
174+
if 'interrupted at shutdown' in str(exc.args[0]):
175+
self.logger.info(
176+
"Got {!r}; attempting recovery. The query spec was: {}"
177+
.format(exc, self.spec)
178+
)
179+
next_record = self.try_reconnect(get_next=get_next)
180+
self.logger.info("Cursor reload after {!r} successful."
181+
.format(exc))
182+
else:
183+
raise
175184

176-
# Increment count before returning so we know how many records
177-
# to skip if a failure occurs later.
178-
self.counter += 1
179185
return next_record
180186

181187
def try_reconnect(self, get_next=True):
@@ -196,7 +202,7 @@ def try_reconnect(self, get_next=True):
196202
try:
197203
# Attempt to reload and get the next batch.
198204
self.reload_cursor()
199-
return self.cursor.next() if get_next else True
205+
return next(self.cursor) if get_next else True
200206

201207
# Replica set hasn't come online yet.
202208
except AutoReconnect:
@@ -224,9 +230,8 @@ def try_reconnect(self, get_next=True):
224230
raise MongoReconnectFailure()
225231

226232
def count(self, with_limit_and_skip=False):
227-
while True:
228-
try:
229-
return self.cursor.count(
230-
with_limit_and_skip=with_limit_and_skip)
231-
except AutoReconnect:
232-
self.try_reconnect(get_next=False)
233+
return self._with_retry(
234+
get_next=False,
235+
f=self.cursor.count,
236+
with_limit_and_skip=with_limit_and_skip,
237+
)

mongo_proxy/mongodb_proxy.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,16 @@
1717
import time
1818
import pymongo
1919

20+
2021
def get_methods(*objs):
2122
return set(
2223
attr
2324
for obj in objs
2425
for attr in dir(obj)
25-
if not attr.startswith('_')
26-
and hasattr(getattr(obj, attr), '__call__')
26+
if not attr.startswith('_') and hasattr(getattr(obj, attr), '__call__')
2727
)
2828

29+
2930
try:
3031
# will fail to import from older versions of pymongo
3132
from pymongo import MongoClient, MongoReplicaSetClient
@@ -57,7 +58,7 @@ def get_connection(obj):
5758
return None
5859

5960

60-
class Executable:
61+
class Executable(object):
6162
""" Wrap a MongoDB-method and handle AutoReconnect-exceptions
6263
using the safe_mongocall decorator.
6364
"""
@@ -118,7 +119,8 @@ def __str__(self):
118119
def __repr__(self):
119120
return self.method.__repr__()
120121

121-
class MongoProxy:
122+
123+
class MongoProxy(object):
122124
""" Proxy for MongoDB connection.
123125
Methods that are executable, i.e find, insert etc, get wrapped in an
124126
Executable-instance that handles AutoReconnect-exceptions transparently.

setup.py

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,24 +16,8 @@
1616

1717
from setuptools import setup, find_packages
1818

19-
LONG_DESCRIPTION = None
20-
README_MARKDOWN = None
21-
22-
with open('README.md') as markdown_source:
23-
README_MARKDOWN = markdown_source.read()
24-
25-
try:
26-
import pandoc
27-
pandoc.core.PANDOC_PATH = 'pandoc'
28-
# Converts the README.md file to ReST, since PyPI uses ReST for formatting,
29-
# This allows to have one canonical README file, being the README.md
30-
doc = pandoc.Document()
31-
doc.markdown = README_MARKDOWN
32-
LONG_DESCRIPTION = doc.rst
33-
except ImportError:
34-
# If pandoc isn't installed, e.g. when downloading from pip,
35-
# just use the regular README.
36-
LONG_DESCRIPTION = README_MARKDOWN
19+
with open('README.rst') as readme_file:
20+
LONG_DESCRIPTION = readme_file.read()
3721

3822
setup(
3923
name='MongoDBProxy',
@@ -45,7 +29,6 @@
4529
classifiers=[
4630
'License :: OSI Approved :: Apache Software License',
4731
],
48-
setup_requires=['pyandoc'],
4932
install_requires=['pymongo'],
5033
url="https://github.com/arngarden/MongoDBProxy"
5134
)

0 commit comments

Comments
 (0)