Skip to content

Commit

Permalink
Merge branch 'master' of github.com:jessepollak/mixpanel-python-async
Browse files Browse the repository at this point in the history
  • Loading branch information
Jesse Pollak committed Apr 29, 2014
2 parents 38e758d + 0baf3c6 commit 485a3b3
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 7 deletions.
35 changes: 30 additions & 5 deletions mixpanel_async/async_buffered_consumer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import absolute_import
import json
import copy
import threading
from datetime import datetime, timedelta
from mixpanel import BufferedConsumer as SynchronousBufferedConsumer
Expand Down Expand Up @@ -78,6 +79,8 @@ def __init__(self, flush_after=timedelta(0, 10), flush_first=True, max_size=20,
self.flush_after = flush_after
self.flush_first = flush_first

self._async_buffers = copy.deepcopy(self._buffers)

if not self.flush_first:
self.last_flushed = datetime.now()
else:
Expand Down Expand Up @@ -106,7 +109,7 @@ def _should_flush(self, endpoint=None):
full = False

if endpoint:
full = len(self._buffers[endpoint]) >= self._max_size
full = len(self._async_buffers[endpoint]) >= self._max_size

# always flush the first event
stale = self.last_flushed is None
Expand Down Expand Up @@ -145,10 +148,10 @@ def send(self, endpoint, json_message):
:type json_message: str
:raises: MixpanelException
'''
if endpoint not in self._buffers:
raise MixpanelException('No such endpoint "{0}". Valid endpoints are one of {1}'.format(self._buffers.keys()))
if endpoint not in self._async_buffers:
raise MixpanelException('No such endpoint "{0}". Valid endpoints are one of {1}'.format(self._async_buffers.keys()))

buf = self._buffers[endpoint]
buf = self._async_buffers[endpoint]
buf.append(json_message)

should_flush = self._should_flush(endpoint)
Expand Down Expand Up @@ -186,6 +189,8 @@ def flush(self, endpoint=None, async=True):
with self.flush_lock:
if self._flush_thread_is_free():

self.transfer_buffers(endpoint=endpoint)

self.flushing_thread = FlushThread(self, endpoint=endpoint)
self.flushing_thread.start()

Expand All @@ -206,7 +211,8 @@ def flush(self, endpoint=None, async=True):
flushing = False

else:
self._sync_flush()
self.transfer_buffers(endpoint=endpoint)
self._sync_flush(endpoint=endpoint)
flushing = True

if flushing:
Expand All @@ -215,6 +221,25 @@ def flush(self, endpoint=None, async=True):
return flushing


def transfer_buffers(self, endpoint=None):
"""
Transfer events from the `_async_buffers` where they are stored to the
`_buffers` where they will be flushed from by the flushing thread.
:param endpoint (str): One of 'events' or 'people, the Mixpanel endpoint
that is about to be flushed
"""
if endpoint:
keys = [endpoint]
else:
keys = self._async_buffers.keys()

for key in keys:
buf = self._async_buffers[key]
while buf:
self._buffers[key].append(buf.pop(0))


def _flush_endpoint(self, endpoint, async=True):
# we override flush with endpoint so as to keep all the
# threading logic in one place, while still allowing individual
Expand Down
20 changes: 18 additions & 2 deletions tests/async_buffered_consumer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,13 @@
try:
from mock import Mock, patch, DEFAULT
except ImportError:
print 'mixpanel-python requires the mock package to run the test suite'
raise
raise Exception(
"""
mixpanel-python-async requires the mock package to run the test suite.
Please run:
$ pip install mock
""")

from mixpanel_async import AsyncBufferedConsumer

Expand Down Expand Up @@ -129,6 +134,17 @@ def test_endpoint_events_get_flushed_instantly_with_max_size_1(self, sync_flush)

sync_flush.assert_called_once_with(endpoint=self.ENDPOINT)

def test_does_not_drop_events(self):
self.consumer = AsyncBufferedConsumer(flush_first=True)
send_patch = patch.object(self.consumer._consumer, 'send').start()

self.send_event()
self.send_event()

self.wait_for_threads()

send_patch.assert_called_once_with(self.ENDPOINT, '[{"test": true}]')
self.assertEqual(self.consumer._async_buffers[self.ENDPOINT], [self.JSON])

def send_event(self, endpoint=None):
endpoint = endpoint or self.ENDPOINT
Expand Down

0 comments on commit 485a3b3

Please sign in to comment.