diff --git a/README.md b/README.md index bddd90a..0774be6 100755 --- a/README.md +++ b/README.md @@ -13,9 +13,6 @@ going out to third parties and sends to [Moesif](https://www.moesif.com) for API This SDK uses the Requests library and will work for Python 2.7 — 3.5. -For high volume APIs, you can [enable Celery](#use_celery) which offloads the logging in a separate task. -Install celery via `pip install celery` and set `USE_CELERY` to True - ## How to install ```shell @@ -26,7 +23,6 @@ pip install moesifdjango In your `settings.py` file in your Django project directory, please add `moesifdjango.middleware.moesif_middleware` to the MIDDLEWARE array. -If you plan to use celery as the backend of asynchronous delivered logged requests, you also need to add `moesifdjango` to your `INSTALLED_APPS`. Because of middleware execution order, it is best to add moesifdjango middleware __below__ SessionMiddleware and AuthenticationMiddleware, because they add useful session data that enables deeper error analysis. On the other hand, if you have other middleware that modified response before going out, you may choose to place Moesif middleware __above__ the middleware modifying response. This allows Moesif to see the modifications to the response data and see closer to what is going over the wire. @@ -87,7 +83,7 @@ After signing up for a Moesif account, your Moesif Application Id will be displa You can always find your Moesif Application Id at any time by logging into the [_Moesif Portal_](https://www.moesif.com/), click on the top right menu, -and then clicking _Installation_. +and then clicking _API Keys_. ## Configuration options @@ -118,6 +114,12 @@ to add custom metadata that will be associated with the event. The metadata must #### __`MASK_EVENT_MODEL`__ (optional) _(EventModel) => EventModel_, a function that takes an EventModel and returns an EventModel with desired data removed. Use this if you prefer to write your own mask function than use the string based filter options: REQUEST_BODY_MASKS, REQUEST_HEADER_MASKS, RESPONSE_BODY_MASKS, & RESPONSE_HEADER_MASKS. The return value must be a valid EventModel required by Moesif data ingestion API. For details regarding EventModel please see the [Moesif Python API Documentation](https://www.moesif.com/docs/api?python). +#### __`BATCH_SIZE`__ +(optional) __int__, default 25, Maximum batch size when sending to Moesif. + +#### __`EVENT_QUEUE_SIZE`__ +(optional) __int__, default 10000, Maximum number of events to hold in queue before sending to Moesif. In case of network issues when not able to connect/send event to Moesif, skips adding new to event to queue to prevent memory overflow. + #### __`AUTHORIZATION_HEADER_NAME`__ (optional) _string_, A request header field name used to identify the User in Moesif. Default value is `authorization`. Also, supports a comma separated string. We will check headers in order like `"X-Api-Key,Authorization"`. @@ -158,28 +160,6 @@ but different frameworks and your implementation might be very different, it wou ##### __`LOG_BODY_OUTGOING`__ (optional) _boolean_, default True, Set to False to remove logging request and response body. -#### __`USE_CELERY`__ -_boolean_, Default False. Set to True to use Celery for queuing sending data to Moesif. Check out [Celery documentation](http://docs.celeryproject.org) for more info. - -##### How to use Celery - -__Because celery is optional, moesifdjango does not prepackage Celery as a dependency. -Make sure you install celery via `pip install celery`__ - -Install celery and redis with `pip install "celery[redis]"` - -*Please Note:* If you're using Celery 3.1 or earlier, install celery and redis with `pip install celery==3.1.25` and `pip install redis==2.10.6` - -Set the configuration option to `USE_CELERY` to `True`. - -```python -MOESIF_MIDDLEWARE = { - 'USE_CELERY': True -} -``` - -Start the celery worker with `celery -A worker --loglevel=debug` - #### __`LOCAL_DEBUG`__ _boolean_, set to True to print internal log messages for debugging SDK integration issues. @@ -231,7 +211,6 @@ MOESIF_MIDDLEWARE = { 'SKIP': should_skip, 'MASK_EVENT_MODEL': mask_event, 'GET_METADATA': get_metadata, - 'USE_CELERY': False } ``` @@ -406,27 +385,26 @@ update_companies = middleware.update_companies_batch([userA, userB]) ## Tested versions -Moesif has validated moesifdjango against the following combinations. -Using the Celery queing service is optional, but can be enabled to enable higher performance. - -| Python | Django | Celery | Redis | Test with Celery | Test w/o Celery | -| ------------ | ------- | ------ | ------ | ---------------- | --------------- | -| Python 2.7 | 1.11.22 | 3.1.25 | 2.10.6 | Yes | Yes | -| Python 2.7 | 1.11.22 | 4.3.0 | 3.2.1 | Yes | Yes | -| Python 2.7 | 1.9 | | | | Yes | -| Python 3.4.5 | 1.11.22 | 3.1.25 | 2.10.6 | | Yes | -| Python 3.4.5 | 1.11.22 | 4.3.0 | 3.2.1 | | Yes | -| Python 3.4.5 | 1.9 | | | Yes | | -| Python 3.6.4 | 1.11.22 | 3.1.25 | 2.10.6 | Yes | Yes | -| Python 3.6.4 | 1.11.22 | 4.3.0 | 3.2.1 | Yes | Yes | -| Python 3.6.4 | 1.9 | | | | Yes | +Moesif has validated moesifdjango against the following combinations. + +| Python | Django | +| ------------ | ------- | +| Python 2.7 | 1.11.22 | +| Python 2.7 | 1.11.22 | +| Python 2.7 | 1.9 | +| Python 3.4.5 | 1.11.22 | +| Python 3.4.5 | 1.11.22 | +| Python 3.4.5 | 1.9 | +| Python 3.6.4 | 1.11.22 | +| Python 3.6.4 | 1.11.22 | +| Python 3.6.4 | 1.9 | ## How to test 1. Manually clone the git repo 2. Invoke `pip install Django` if you haven't done so. 3. Invoke `pip install moesifdjango` -3. Add your own application id to 'tests/settings.py'. You can find your Application Id from [_Moesif Dashboard_](https://www.moesif.com/) -> _Top Right Menu_ -> _Installation_ +3. Add your own application id to 'tests/settings.py'. You can find your Application Id from [_Moesif Dashboard_](https://www.moesif.com/) -> _Top Right Menu_ -> _API Keys_ 4. From terminal/cmd navigate to the root directory of the middleware tests. 5. Invoke `python manage.py test` if you are using Django 1.10 or newer. 6. Invoke `python manage.py test middleware_pre19_tests` if you are using Django 1.9 or older. diff --git a/moesifdjango/job_scheduler.py b/moesifdjango/job_scheduler.py new file mode 100644 index 0000000..befe961 --- /dev/null +++ b/moesifdjango/job_scheduler.py @@ -0,0 +1,69 @@ +from datetime import datetime +from .app_config import AppConfig + +class JobScheduler: + + def __init__(self): + self.app_config = AppConfig() + + @classmethod + def exit_handler(cls, scheduler, debug): + try: + # Shut down the scheduler + scheduler.shutdown() + except: + if debug: + print("Error while closing the queue or scheduler shut down") + + def send_events(self, api_client, batch_events, debug): + try: + if debug: + print("Sending events to Moesif") + batch_events_api_response = api_client.create_events_batch(batch_events) + if debug: + print("Events sent successfully") + # Fetch Config ETag from response header + batch_events_response_config_etag = batch_events_api_response.get("X-Moesif-Config-ETag") + # Return Config Etag + return batch_events_response_config_etag + except Exception as ex: + if debug: + print("Error sending event to Moesif") + print(str(ex)) + return None + + # Function to fetch application config + def fetch_app_config(self, config, config_etag, sampling_percentage, last_updated_time, api_client, debug): + try: + config = self.app_config.get_config(api_client, debug) + if config: + config_etag, sampling_percentage, last_updated_time = self.app_config.parse_configuration(config, debug) + except Exception as e: + if debug: + print('Error while fetching the application configuration') + print(str(e)) + return config, config_etag, sampling_percentage, last_updated_time + + def batch_events(self, api_client, moesif_events_queue, debug, batch_size): + # Set the last time event job ran + last_event_job_run_time = datetime.utcnow() + + batch_events = [] + try: + while not moesif_events_queue.empty(): + batch_events.append(moesif_events_queue.get_nowait()) + if len(batch_events) == batch_size: + break + + if batch_events: + batch_response = self.send_events(api_client, batch_events, debug) + batch_events[:] = [] + return batch_response, last_event_job_run_time + else: + if debug: + print("No events to send") + return None, last_event_job_run_time + except: + if debug: + print("No message to read from the queue") + return None, last_event_job_run_time diff --git a/moesifdjango/middleware.py b/moesifdjango/middleware.py index 2a4ac61..b7f6256 100644 --- a/moesifdjango/middleware.py +++ b/moesifdjango/middleware.py @@ -7,6 +7,7 @@ import logging import random import math +import queue from django.conf import settings from django.utils import timezone from moesifapi.moesif_api_client import * @@ -28,41 +29,11 @@ from .client_ip import ClientIp from .logger_helper import LoggerHelper from .event_mapper import EventMapper - -# Logger Config -logging.basicConfig() -logger = logging.getLogger('logger') - -CELERY = False -if settings.MOESIF_MIDDLEWARE.get('USE_CELERY', False): - try: - import celery - from .tasks import async_client_create_event - from kombu import Connection - try: - BROKER_URL = settings.BROKER_URL - if BROKER_URL: - CELERY = True - else: - CELERY = False - except AttributeError: - BROKER_URL = settings.MOESIF_MIDDLEWARE.get('CELERY_BROKER_URL', None) - if BROKER_URL: - CELERY = True - else: - logger.warning("USE_CELERY flag was set to TRUE, but BROKER_URL not found") - CELERY = False - - try: - conn = Connection(BROKER_URL) - simple_queue = conn.SimpleQueue('moesif_events_queue') - except: - logger.error("Error while connecting to - {0}".format(BROKER_URL)) - - except: - logger.warning("USE_CELERY flag was set to TRUE, but celery package not found.") - CELERY = False - +from .job_scheduler import JobScheduler +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.triggers.interval import IntervalTrigger +from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED +import atexit class moesif_middleware: def __init__(self, get_response): @@ -75,6 +46,7 @@ def __init__(self, get_response): # below comment for setting moesif base_uri to a test server. if self.middleware_settings.get('LOCAL_DEBUG', False): Configuration.BASE_URI = self.middleware_settings.get('LOCAL_MOESIF_BASEURL', 'https://api.moesif.net') + Configuration.version = 'moesifdjango-python/2.0.0' if settings.MOESIF_MIDDLEWARE.get('CAPTURE_OUTGOING_REQUESTS', False): try: if self.DEBUG: @@ -82,7 +54,7 @@ def __init__(self, get_response): # Start capturing outgoing requests StartCapture().start_capture_outgoing(settings.MOESIF_MIDDLEWARE) except: - logger.error('Error while starting to capture the outgoing events') + print('Error while starting to capture the outgoing events') self.api_version = self.middleware_settings.get('API_VERSION') self.api_client = self.client.api self.response_catcher = HttpResponseCatcher() @@ -91,6 +63,7 @@ def __init__(self, get_response): self.client_ip = ClientIp() self.logger_helper = LoggerHelper() self.event_mapper = EventMapper() + self.job_scheduler = JobScheduler() self.mask_helper = MaskData() self.user = User() self.company = Company() @@ -98,12 +71,76 @@ def __init__(self, get_response): self.sampling_percentage = 100 self.config_etag = None self.last_updated_time = datetime.utcnow() + self.last_event_job_run_time = datetime.utcnow() + self.scheduler = BackgroundScheduler(daemon=True) + self.event_queue_size = self.middleware_settings.get('EVENT_QUEUE_SIZE', 10000) + self.mo_events_queue = queue.Queue(maxsize=self.event_queue_size) + self.event_batch_size = self.middleware_settings.get('BATCH_SIZE', 25) + self.is_event_job_scheduled = False try: if self.config: self.config_etag, self.sampling_percentage, self.last_updated_time = self.app_config.parse_configuration(self.config, self.DEBUG) - except: + except Exception as e: if self.DEBUG: - logger.error('Error while parsing application configuration on initialization') + print('Error while parsing application configuration on initialization') + print(str(e)) + try: + self.schedule_event_background_job() + self.is_event_job_scheduled = True + except Exception as ex: + self.is_event_job_scheduled = False + if self.DEBUG: + print('Error while starting the event scheduler job in background') + print(str(ex)) + + # Function to listen to the send event job response + def event_listener(self, event): + if event.exception: + if self.DEBUG: + print('Error reading response from the scheduled event job') + else: + if event.retval: + response_etag, self.last_event_job_run_time = event.retval + if response_etag is not None \ + and self.config_etag is not None \ + and self.config_etag != response_etag \ + and datetime.utcnow() > self.last_updated_time + timedelta(minutes=5): + try: + self.config, self.config_etag, self.sampling_percentage, self.last_updated_time = \ + self.job_scheduler.fetch_app_config(self.config, self.config_etag, self.sampling_percentage, + self.last_updated_time, self.api_client, self.DEBUG) + except Exception as ex: + if self.DEBUG: + print('Error while updating the application configuration') + print(str(ex)) + + # Function to schedule send event job in async + def schedule_event_background_job(self): + try: + if not self.scheduler.get_jobs(): + + self.scheduler.add_listener(self.event_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR) + self.scheduler.start() + self.scheduler.add_job( + func=lambda: self.job_scheduler.batch_events(self.api_client, self.mo_events_queue, self.DEBUG, + self.event_batch_size), + trigger=IntervalTrigger(seconds=2), + id='moesif_events_batch_job', + name='Schedule events batch job every 2 second', + replace_existing=True) + + # Avoid passing logging message to the ancestor loggers + logging.getLogger('apscheduler.executors.default').setLevel(logging.WARNING) + logging.getLogger('apscheduler.executors.default').propagate = False + + # Exit handler when exiting the app + atexit.register(lambda: self.job_scheduler.exit_handler(self.scheduler, self.DEBUG)) + else: + self.last_event_job_run_time = datetime.utcnow() + except Exception as ex: + if self.DEBUG: + print("Error when scheduling the job") + print(str(ex)) def __call__(self, request): # Code to be executed for each request before @@ -189,52 +226,23 @@ def __call__(self, request): # Mask Event Model event_model = self.logger_helper.mask_event(event_model, self.middleware_settings, self.DEBUG) - def sending_event(): - if self.DEBUG: - print("sending event to moesif") - try: - if not CELERY: - event_api_response = self.api_client.create_event(event_model) - event_response_config_etag = event_api_response.get("X-Moesif-Config-ETag") - if event_response_config_etag is not None \ - and self.config_etag is not None \ - and self.config_etag != event_response_config_etag \ - and datetime.utcnow() > self.last_updated_time + timedelta(minutes=5): - try: - self.config = self.app_config.get_config(self.api_client, self.DEBUG) - self.config_etag, self.sampling_percentage, self.last_updated_time = self.app_config.parse_configuration(self.config, self.DEBUG) - except: - if self.DEBUG: - logger.error('Error while updating the application configuration') - else: - try: - message = event_model.to_dictionary() - simple_queue.put(message) - if self.DEBUG: - print("Event added to the queue") - except: - if self.DEBUG: - print("Error while adding event to the queue") - if self.DEBUG: - print("Event sent successfully") - except APIException as inst: - if 401 <= inst.response_code <= 403: - print("Unauthorized access sending event to Moesif. Please check your Appplication Id.") - if self.DEBUG: - print("Error sending event to Moesif, with status code:") - print(inst.response_code) - + # Create random percentage random_percentage = random.random() * 100 - self.sampling_percentage = self.app_config.get_sampling_percentage(self.config, user_id, company_id) if self.sampling_percentage >= random_percentage: event_model.weight = 1 if self.sampling_percentage == 0 else math.floor(100 / self.sampling_percentage) - if CELERY: - sending_event() - else: - # send the event to moesif via background so not blocking - sending_background_thread = threading.Thread(target=sending_event) - sending_background_thread.start() + try: + if self.is_event_job_scheduled and datetime.utcnow() < self.last_event_job_run_time + timedelta(minutes=5): + if self.DEBUG: + print("Add Event to the queue") + self.mo_events_queue.put(event_model) + else: + self.schedule_event_background_job() + self.is_event_job_scheduled = True + except Exception as ex: + if self.DEBUG: + print("Error while adding event to the queue") + print(str(ex)) return response diff --git a/moesifdjango/middleware_pre19.py b/moesifdjango/middleware_pre19.py index 9ad9f6d..b46b706 100644 --- a/moesifdjango/middleware_pre19.py +++ b/moesifdjango/middleware_pre19.py @@ -4,6 +4,8 @@ import threading import random import math +import queue +import logging from django.conf import settings from django.utils import timezone from moesifapi.moesif_api_client import * @@ -26,6 +28,11 @@ from .client_ip import ClientIp from .logger_helper import LoggerHelper from .event_mapper import EventMapper +from .job_scheduler import JobScheduler +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.triggers.interval import IntervalTrigger +from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED +import atexit class MoesifMiddlewarePre19(object): @@ -38,6 +45,7 @@ def __init__(self): # below comment for setting moesif base_uri to a test server. if self.middleware_settings.get('LOCAL_DEBUG', False): Configuration.BASE_URI = self.middleware_settings.get('LOCAL_MOESIF_BASEURL', 'https://api.moesif.net') + Configuration.version = 'moesifdjango-python/2.0.0' if settings.MOESIF_MIDDLEWARE.get('CAPTURE_OUTGOING_REQUESTS', False): try: if self.DEBUG: @@ -55,6 +63,7 @@ def __init__(self): self.client_ip = ClientIp() self.logger_helper = LoggerHelper() self.event_mapper = EventMapper() + self.job_scheduler = JobScheduler() self.mask_helper = MaskData() self.user = User() self.company = Company() @@ -62,6 +71,12 @@ def __init__(self): self.sampling_percentage = 100 self.config_etag = None self.last_updated_time = datetime.utcnow() + self.last_event_job_run_time = datetime.utcnow() + self.scheduler = BackgroundScheduler(daemon=True) + self.event_queue_size = self.middleware_settings.get('EVENT_QUEUE_SIZE', 10000) + self.mo_events_queue = queue.Queue(maxsize=self.event_queue_size) + self.event_batch_size = self.middleware_settings.get('BATCH_SIZE', 25) + self.is_event_job_scheduled = False try: if self.config: self.config_etag, self.sampling_percentage, self.last_updated_time = self.app_config.parse_configuration( @@ -70,6 +85,62 @@ def __init__(self): if self.DEBUG: print('Error while parsing application configuration on initialization') self.transaction_id = None + try: + self.schedule_event_background_job() + self.is_event_job_scheduled = True + except Exception as ex: + self.is_event_job_scheduled = False + if self.DEBUG: + print('Error while starting the event scheduler job in background') + print(str(ex)) + + # Function to listen to the send event job response + def event_listener(self, event): + if event.exception: + if self.DEBUG: + print('Error reading response from the scheduled event job') + else: + if event.retval: + response_etag, self.last_event_job_run_time = event.retval + if response_etag is not None \ + and self.config_etag is not None \ + and self.config_etag != response_etag \ + and datetime.utcnow() > self.last_updated_time + timedelta(minutes=5): + try: + self.config, self.config_etag, self.sampling_percentage, self.last_updated_time = \ + self.job_scheduler.fetch_app_config(self.config, self.config_etag, self.sampling_percentage, + self.last_updated_time, self.api_client, self.DEBUG) + except Exception as ex: + if self.DEBUG: + print('Error while updating the application configuration') + print(str(ex)) + + # Function to schedule send event job in async + def schedule_event_background_job(self): + try: + if not self.scheduler.get_jobs(): + self.scheduler.add_listener(self.event_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR) + self.scheduler.start() + self.scheduler.add_job( + func=lambda: self.job_scheduler.batch_events(self.api_client, self.mo_events_queue, self.DEBUG, + self.event_batch_size), + trigger=IntervalTrigger(seconds=2), + id='moesif_events_batch_job', + name='Schedule events batch job every 2 second', + replace_existing=True) + + # Avoid passing logging message to the ancestor loggers + logging.getLogger('apscheduler.executors.default').setLevel(logging.WARNING) + logging.getLogger('apscheduler.executors.default').propagate = False + + # Exit handler when exiting the app + atexit.register(lambda: self.job_scheduler.exit_handler(self.scheduler, self.DEBUG)) + else: + self.last_event_job_run_time = datetime.utcnow() + except Exception as ex: + if self.DEBUG: + print("Error when scheduling the job") + print(str(ex)) @classmethod def process_request(cls, request): @@ -156,40 +227,23 @@ def process_response(self, request, response): # Mask Event Model event_model = self.logger_helper.mask_event(event_model, self.middleware_settings, self.DEBUG) - def sending_event(): - if self.DEBUG: - print("sending event to moesif") - try: - event_api_response = self.api_client.create_event(event_model) - event_response_config_etag = event_api_response.get("X-Moesif-Config-ETag") - - if event_response_config_etag is not None \ - and self.config_etag is not None \ - and self.config_etag != event_response_config_etag \ - and datetime.utcnow() > self.last_updated_time + timedelta(minutes=5): - try: - self.config = self.app_config.get_config(self.api_client, self.DEBUG) - self.config_etag, self.sampling_percentage, self.last_updated_time = self.app_config.parse_configuration(self.config, self.DEBUG) - except: - if self.DEBUG: - print('Error while updating the application configuration') - if self.DEBUG: - print("Event sent successfully") - except APIException as inst: - if 401 <= inst.response_code <= 403: - print("Unauthorized access sending event to Moesif. Please check your Appplication Id.") - if self.DEBUG: - print("Error sending event to Moesif, with status code:") - print(inst.response_code) - + # Create random percentage random_percentage = random.random() * 100 - self.sampling_percentage = self.app_config.get_sampling_percentage(self.config, user_id, company_id) if self.sampling_percentage >= random_percentage: event_model.weight = 1 if self.sampling_percentage == 0 else math.floor(100 / self.sampling_percentage) - # send the event to moesif via background so not blocking - sending_background_thread = threading.Thread(target=sending_event) - sending_background_thread.start() + try: + if self.is_event_job_scheduled and datetime.utcnow() < self.last_event_job_run_time + timedelta(minutes=5): + if self.DEBUG: + print("Add Event to the queue") + self.mo_events_queue.put(event_model) + else: + self.schedule_event_background_job() + self.is_event_job_scheduled = True + except Exception as ex: + if self.DEBUG: + print("Error while adding event to the queue") + print(str(ex)) return response diff --git a/moesifdjango/tasks.py b/moesifdjango/tasks.py deleted file mode 100644 index 117bb85..0000000 --- a/moesifdjango/tasks.py +++ /dev/null @@ -1,151 +0,0 @@ -""" -__author__ = 'mpetyx (Michael Petychakis)' -__version__ = "1.0.0" -__maintainer__ = "Michael Petychakis" -__email__ = "michael@orfium.com" -__status__ = "Production" -""" - -from __future__ import absolute_import, unicode_literals -from django.conf import settings -from .http_response_catcher import HttpResponseCatcher -from celery import shared_task -from moesifapi.moesif_api_client import MoesifAPIClient -from moesifapi.models import EventModel -from .app_config import AppConfig -from datetime import datetime, timedelta - -middleware_settings = settings.MOESIF_MIDDLEWARE -client = MoesifAPIClient(middleware_settings.get('APPLICATION_ID')) -BATCH_SIZE = settings.MOESIF_MIDDLEWARE.get('BATCH_SIZE', 25) -DEBUG = middleware_settings.get('LOCAL_DEBUG', False) - -api_client = client.api -response_catcher = HttpResponseCatcher() -api_client.http_call_back = response_catcher - -def get_config(): - app_config = AppConfig() - config = app_config.get_config(api_client, DEBUG) - sampling_percentage = 100 - config_etag = None - last_updated_time = datetime.utcnow() - try: - if config: - config_etag, sampling_percentage, last_updated_time = app_config.parse_configuration(config, DEBUG) - except: - if DEBUG: - print('Error while parsing application configuration on initialization') - return config, config_etag, sampling_percentage, last_updated_time - -try: - get_broker_url = settings.BROKER_URL - if get_broker_url: - BROKER_URL = get_broker_url - else: - BROKER_URL = None -except AttributeError: - BROKER_URL = settings.MOESIF_MIDDLEWARE.get('CELERY_BROKER_URL', None) - - -def queue_get_all(queue_events): - events = [] - for num_of_events_retrieved in range(0, BATCH_SIZE): - try: - if num_of_events_retrieved == BATCH_SIZE: - break - events.append(queue_events.get_nowait()) - except: - break - return events - -@shared_task(ignore_result=True) -def async_client_create_event(moesif_events_queue, config, config_etag, last_updated_time): - batch_events = [] - try: - queue_size = moesif_events_queue.qsize() - while queue_size > 0: - message = queue_get_all(moesif_events_queue) - for event in message: - batch_events.append(EventModel().from_dictionary(event.payload)) - event.ack() - try: - queue_size = moesif_events_queue.qsize() - except ChannelError: - queue_size = 0 - except ChannelError: - if DEBUG: - print("No message to read from the queue") - - if batch_events: - if DEBUG: - print("Sending events to Moesif") - batch_events_api_response = api_client.create_events_batch(batch_events) - batch_events_response_config_etag = batch_events_api_response.get("X-Moesif-Config-ETag") - if batch_events_response_config_etag is not None \ - and config_etag is not None \ - and config_etag != batch_events_response_config_etag \ - and datetime.utcnow() > last_updated_time + timedelta(minutes=5): - - try: - config = config.get_config(api_client, DEBUG) - config_etag, sampling_percentage, last_updated_time = config.parse_configuration( - config, DEBUG) - except: - if DEBUG: - print('Error while updating the application configuration') - if DEBUG: - print("Events sent successfully") - else: - if DEBUG: - print("No events to send") - - -def exit_handler(moesif_events_queue, scheduler): - try: - # Close the queue - moesif_events_queue.close() - # Shut down the scheduler - scheduler.shutdown() - except: - if DEBUG: - print("Error while closing the queue or scheduler shut down") - -CELERY = False -if settings.MOESIF_MIDDLEWARE.get('USE_CELERY', False): - if BROKER_URL: - try: - from apscheduler.schedulers.background import BackgroundScheduler - from apscheduler.triggers.interval import IntervalTrigger - from kombu import Connection - from kombu.exceptions import ChannelError - import atexit - import logging - - scheduler = BackgroundScheduler(daemon=True) - scheduler.start() - config, config_etag, sampling_percentage, last_updated_time = get_config() - try: - conn = Connection(BROKER_URL) - moesif_events_queue = conn.SimpleQueue('moesif_events_queue') - scheduler.add_job( - func=lambda: async_client_create_event(moesif_events_queue, config, config_etag, last_updated_time), - trigger=IntervalTrigger(seconds=5), - id='moesif_events_batch_job', - name='Schedule events batch job every 5 second', - replace_existing=True) - - # Avoid passing logging message to the ancestor loggers - logging.getLogger('apscheduler.executors.default').propagate = False - - # Exit handler when exiting the app - atexit.register(lambda: exit_handler(moesif_events_queue, scheduler)) - except: - if DEBUG: - print("Error while connecting to - {0}".format(BROKER_URL)) - except: - if DEBUG: - print("Error when scheduling the job") - else: - if DEBUG: - print("Unable to schedule the job as the BROKER_URL is not provided") diff --git a/requirements.txt b/requirements.txt index 5f2aa75..7c76734 100755 --- a/requirements.txt +++ b/requirements.txt @@ -3,7 +3,7 @@ jsonpickle==0.7.1 python-dateutil==2.5.3 nose==1.3.7 isodatetimehandler==1.0.2 -moesifapi==1.3.2 +moesifapi==1.3.3 celery>=3.1.25 moesifpythonrequest==0.2.0 apscheduler==3.6.1 diff --git a/setup.py b/setup.py index 88b04f0..8acfb2a 100644 --- a/setup.py +++ b/setup.py @@ -28,7 +28,7 @@ # Versions should comply with PEP440. For a discussion on single-sourcing # the version across setup.py and the project code, see # https://packaging.python.org/en/latest/single_source_version.html - version='1.7.8', + version='2.0.0', description='Moesif Middleware for Python Django', long_description=long_description,