From f32b1f96e1fe35b25bc9cffc8e53feaf1e429994 Mon Sep 17 00:00:00 2001 From: Keyur Date: Wed, 10 Feb 2021 19:42:58 -0800 Subject: [PATCH 1/3] Add: Support to send events in batch Add: Support to send events in batch Fix: Sampling events when using Celery Refactor: Remove logger and use print message Refactor: README.md Bump version to 1.3.9 --- README.md | 3 + moesifdjango/job_scheduler.py | 51 +++++++++ moesifdjango/middleware.py | 186 +++++++++++++++++++++++-------- moesifdjango/middleware_pre19.py | 108 +++++++++++++----- moesifdjango/tasks.py | 53 +++------ setup.py | 2 +- 6 files changed, 289 insertions(+), 114 deletions(-) create mode 100644 moesifdjango/job_scheduler.py diff --git a/README.md b/README.md index bddd90a..a609272 100755 --- a/README.md +++ b/README.md @@ -118,6 +118,9 @@ to add custom metadata that will be associated with the event. The metadata must #### __`MASK_EVENT_MODEL`__ (optional) _(EventModel) => EventModel_, a function that takes an EventModel and returns an EventModel with desired data removed. Use this if you prefer to write your own mask function than use the string based filter options: REQUEST_BODY_MASKS, REQUEST_HEADER_MASKS, RESPONSE_BODY_MASKS, & RESPONSE_HEADER_MASKS. The return value must be a valid EventModel required by Moesif data ingestion API. For details regarding EventModel please see the [Moesif Python API Documentation](https://www.moesif.com/docs/api?python). +#### __`BATCH_SIZE`__ +(optional) __int__, default 25, Maximum batch size when sending to Moesif. + #### __`AUTHORIZATION_HEADER_NAME`__ (optional) _string_, A request header field name used to identify the User in Moesif. Default value is `authorization`. Also, supports a comma separated string. We will check headers in order like `"X-Api-Key,Authorization"`. diff --git a/moesifdjango/job_scheduler.py b/moesifdjango/job_scheduler.py new file mode 100644 index 0000000..d72d546 --- /dev/null +++ b/moesifdjango/job_scheduler.py @@ -0,0 +1,51 @@ + +class JobScheduler: + + def __init__(self): + pass + + @classmethod + def exit_handler(cls, scheduler, debug): + try: + # Shut down the scheduler + scheduler.shutdown() + except: + if debug: + print("Error while closing the queue or scheduler shut down") + + @classmethod + def send_events(cls, api_client, batch_events, debug): + try: + if debug: + print("Sending events to Moesif") + batch_events_api_response = api_client.create_events_batch(batch_events) + if debug: + print("Events sent successfully") + # Fetch Config ETag from response header + batch_events_response_config_etag = batch_events_api_response.get("X-Moesif-Config-ETag") + # Return Config Etag + return batch_events_response_config_etag + except Exception as ex: + if debug: + print("Error sending event to Moesif") + print(str(ex)) + return None + + def batch_events(self, api_client, moesif_events_queue, debug, batch_size): + batch_events = [] + try: + while not moesif_events_queue.empty(): + batch_events.append(moesif_events_queue.get_nowait()) + if len(batch_events) == batch_size: + break + + if batch_events: + batch_response = self.send_events(api_client, batch_events, debug) + batch_events[:] = [] + return batch_response + else: + if debug: + print("No events to send") + except: + if debug: + print("No message to read from the queue") diff --git a/moesifdjango/middleware.py b/moesifdjango/middleware.py index 2a4ac61..11315c5 100644 --- a/moesifdjango/middleware.py +++ b/moesifdjango/middleware.py @@ -7,6 +7,7 @@ import logging import random import math +import queue from django.conf import settings from django.utils import timezone from moesifapi.moesif_api_client import * @@ -28,10 +29,7 @@ from .client_ip import ClientIp from .logger_helper import LoggerHelper from .event_mapper import EventMapper - -# Logger Config -logging.basicConfig() -logger = logging.getLogger('logger') +from .job_scheduler import JobScheduler CELERY = False if settings.MOESIF_MIDDLEWARE.get('USE_CELERY', False): @@ -50,17 +48,17 @@ if BROKER_URL: CELERY = True else: - logger.warning("USE_CELERY flag was set to TRUE, but BROKER_URL not found") + print("USE_CELERY flag was set to TRUE, but BROKER_URL not found") CELERY = False try: conn = Connection(BROKER_URL) simple_queue = conn.SimpleQueue('moesif_events_queue') except: - logger.error("Error while connecting to - {0}".format(BROKER_URL)) + print("Error while connecting to - {0}".format(BROKER_URL)) except: - logger.warning("USE_CELERY flag was set to TRUE, but celery package not found.") + print("USE_CELERY flag was set to TRUE, but celery package not found.") CELERY = False @@ -82,7 +80,7 @@ def __init__(self, get_response): # Start capturing outgoing requests StartCapture().start_capture_outgoing(settings.MOESIF_MIDDLEWARE) except: - logger.error('Error while starting to capture the outgoing events') + print('Error while starting to capture the outgoing events') self.api_version = self.middleware_settings.get('API_VERSION') self.api_client = self.client.api self.response_catcher = HttpResponseCatcher() @@ -91,6 +89,7 @@ def __init__(self, get_response): self.client_ip = ClientIp() self.logger_helper = LoggerHelper() self.event_mapper = EventMapper() + self.job_scheduler = JobScheduler() self.mask_helper = MaskData() self.user = User() self.company = Company() @@ -98,12 +97,116 @@ def __init__(self, get_response): self.sampling_percentage = 100 self.config_etag = None self.last_updated_time = datetime.utcnow() + self.mo_events_queue = queue.Queue() + self.event_batch_size = self.middleware_settings.get('BATCH_SIZE', 25) + self.is_event_job_scheduled = False + self.is_config_job_scheduled = False try: if self.config: self.config_etag, self.sampling_percentage, self.last_updated_time = self.app_config.parse_configuration(self.config, self.DEBUG) - except: + except Exception as e: + if self.DEBUG: + print('Error while parsing application configuration on initialization') + print(str(e)) + try: + if not CELERY: + self.schedule_event_background_job() + self.is_event_job_scheduled = True + except Exception as ex: + self.is_event_job_scheduled = False + if self.DEBUG: + print('Error while starting the event scheduler job in background') + print(str(ex)) + + # Function to listen to the send event job response + def event_listener(self, event): + if event.exception: + if self.DEBUG: + print('Error reading response from the scheduled event job') + else: + if event.retval: + if event.retval is not None \ + and self.config_etag is not None \ + and self.config_etag != event.retval \ + and datetime.utcnow() > self.last_updated_time + timedelta(minutes=5): + try: + self.fetch_app_config() + except Exception as ex: + if self.DEBUG: + print('Error while updating the application configuration') + print(str(ex)) + + # Function to schedule send event job in async + def schedule_event_background_job(self): + try: + from apscheduler.schedulers.background import BackgroundScheduler + from apscheduler.triggers.interval import IntervalTrigger + from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED + import atexit + + scheduler = BackgroundScheduler(daemon=True) + scheduler.add_listener(self.event_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR) + scheduler.start() + try: + scheduler.add_job( + func=lambda: self.job_scheduler.batch_events(self.api_client, self.mo_events_queue, self.DEBUG, self.event_batch_size), + trigger=IntervalTrigger(seconds=2), + id='moesif_events_batch_job', + name='Schedule events batch job every 2 second', + replace_existing=True) + + # Exit handler when exiting the app + atexit.register(lambda: self.job_scheduler.exit_handler(scheduler, self.DEBUG)) + except Exception as ex: + if self.DEBUG: + print("Error while calling async function") + print(str(ex)) + except Exception as e: + if self.DEBUG: + print("Error when scheduling the job") + print(str(e)) + + # Function to fetch application config + def fetch_app_config(self): + try: + self.config = self.app_config.get_config(self.api_client, self.DEBUG) + if self.config: + self.config_etag, self.sampling_percentage, self.last_updated_time = self.app_config.parse_configuration(self.config, self.DEBUG) + except Exception as e: if self.DEBUG: - logger.error('Error while parsing application configuration on initialization') + print('Error while fetching the application configuration') + print(str(e)) + + def schedule_app_config_job(self): + try: + from apscheduler.schedulers.background import BackgroundScheduler + from apscheduler.triggers.interval import IntervalTrigger + import atexit + + scheduler = BackgroundScheduler(daemon=True) + scheduler.start() + try: + scheduler.add_job( + func=lambda: self.fetch_app_config(), + trigger=IntervalTrigger(minutes=5), + id='moesif_app_config_job', + name='Schedule app config job every 5 minutes', + replace_existing=True) + + # Avoid passing logging message to the ancestor loggers + logging.getLogger('apscheduler.executors.default').setLevel(logging.WARNING) + logging.getLogger('apscheduler.executors.default').propagate = False + + # Exit handler when exiting the app + atexit.register(lambda: self.job_scheduler.exit_handler(scheduler, self.DEBUG)) + except Exception as ex: + if self.DEBUG: + print("Error while calling app config async function") + print(str(ex)) + except Exception as e: + if self.DEBUG: + print("Error when scheduling the app config job") + print(str(e)) def __call__(self, request): # Code to be executed for each request before @@ -190,51 +293,46 @@ def __call__(self, request): event_model = self.logger_helper.mask_event(event_model, self.middleware_settings, self.DEBUG) def sending_event(): - if self.DEBUG: - print("sending event to moesif") try: - if not CELERY: - event_api_response = self.api_client.create_event(event_model) - event_response_config_etag = event_api_response.get("X-Moesif-Config-ETag") - if event_response_config_etag is not None \ - and self.config_etag is not None \ - and self.config_etag != event_response_config_etag \ - and datetime.utcnow() > self.last_updated_time + timedelta(minutes=5): - try: - self.config = self.app_config.get_config(self.api_client, self.DEBUG) - self.config_etag, self.sampling_percentage, self.last_updated_time = self.app_config.parse_configuration(self.config, self.DEBUG) - except: - if self.DEBUG: - logger.error('Error while updating the application configuration') - else: - try: - message = event_model.to_dictionary() - simple_queue.put(message) - if self.DEBUG: - print("Event added to the queue") - except: - if self.DEBUG: - print("Error while adding event to the queue") + message = event_model.to_dictionary() + simple_queue.put(message) if self.DEBUG: - print("Event sent successfully") - except APIException as inst: - if 401 <= inst.response_code <= 403: - print("Unauthorized access sending event to Moesif. Please check your Appplication Id.") + print("Event added to the queue") + except Exception as exc: if self.DEBUG: - print("Error sending event to Moesif, with status code:") - print(inst.response_code) + print("Error while adding event to the queue") + print(str(exc)) + # Create random percentage random_percentage = random.random() * 100 - self.sampling_percentage = self.app_config.get_sampling_percentage(self.config, user_id, company_id) if self.sampling_percentage >= random_percentage: event_model.weight = 1 if self.sampling_percentage == 0 else math.floor(100 / self.sampling_percentage) if CELERY: sending_event() + try: + if not self.is_config_job_scheduled: + self.schedule_app_config_job() + self.is_config_job_scheduled = True + except Exception as e: + if self.DEBUG: + print('Error while starting the app config scheduler job in background') + print(str(e)) + self.is_config_job_scheduled = False else: - # send the event to moesif via background so not blocking - sending_background_thread = threading.Thread(target=sending_event) - sending_background_thread.start() + try: + if self.is_event_job_scheduled: + if self.DEBUG: + print("Add Event to the queue") + self.mo_events_queue.put(event_model) + else: + self.schedule_event_background_job() + self.is_event_job_scheduled = True + except Exception as ex: + if self.DEBUG: + print("Error while adding event to the queue") + print(str(ex)) + self.is_event_job_scheduled = False return response diff --git a/moesifdjango/middleware_pre19.py b/moesifdjango/middleware_pre19.py index 9ad9f6d..2b6febe 100644 --- a/moesifdjango/middleware_pre19.py +++ b/moesifdjango/middleware_pre19.py @@ -4,6 +4,7 @@ import threading import random import math +import queue from django.conf import settings from django.utils import timezone from moesifapi.moesif_api_client import * @@ -26,6 +27,7 @@ from .client_ip import ClientIp from .logger_helper import LoggerHelper from .event_mapper import EventMapper +from .job_scheduler import JobScheduler class MoesifMiddlewarePre19(object): @@ -55,6 +57,7 @@ def __init__(self): self.client_ip = ClientIp() self.logger_helper = LoggerHelper() self.event_mapper = EventMapper() + self.job_scheduler = JobScheduler() self.mask_helper = MaskData() self.user = User() self.company = Company() @@ -62,6 +65,9 @@ def __init__(self): self.sampling_percentage = 100 self.config_etag = None self.last_updated_time = datetime.utcnow() + self.mo_events_queue = queue.Queue() + self.event_batch_size = self.middleware_settings.get('BATCH_SIZE', 25) + self.is_event_job_scheduled = False try: if self.config: self.config_etag, self.sampling_percentage, self.last_updated_time = self.app_config.parse_configuration( @@ -70,6 +76,64 @@ def __init__(self): if self.DEBUG: print('Error while parsing application configuration on initialization') self.transaction_id = None + try: + self.schedule_event_background_job() + self.is_event_job_scheduled = True + except Exception as ex: + self.is_event_job_scheduled = False + if self.DEBUG: + print('Error while starting the event scheduler job in background') + print(str(ex)) + + # Function to listen to the send event job response + def event_listener(self, event): + if event.exception: + if self.DEBUG: + print('Error reading response from the event scheduled job') + else: + if event.retval: + if event.retval is not None \ + and self.config_etag is not None \ + and self.config_etag != event.retval \ + and datetime.utcnow() > self.last_updated_time + timedelta(minutes=5): + try: + self.config = self.app_config.get_config(self.api_client, self.DEBUG) + self.config_etag, self.sampling_percentage, self.last_updated_time = self.app_config.parse_configuration( + self.config, self.DEBUG) + except Exception as ex: + if self.DEBUG: + print('Error while updating the application configuration') + print(str(ex)) + + # Function to schedule send event job in async + def schedule_event_background_job(self): + try: + from apscheduler.schedulers.background import BackgroundScheduler + from apscheduler.triggers.interval import IntervalTrigger + from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED + import atexit + + scheduler = BackgroundScheduler(daemon=True) + scheduler.add_listener(self.event_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR) + scheduler.start() + try: + scheduler.add_job( + func=lambda: self.job_scheduler.batch_events(self.api_client, self.mo_events_queue, self.DEBUG, self.event_batch_size), + trigger=IntervalTrigger(seconds=2), + id='moesif_events_batch_job', + name='Schedule events batch job every 2 second', + replace_existing=True) + + # Exit handler when exiting the app + atexit.register(lambda: self.job_scheduler.exit_handler(scheduler, self.DEBUG)) + except Exception as ex: + if self.DEBUG: + print("Error while calling async function") + print(str(ex)) + except Exception as e: + if self.DEBUG: + print("Error when scheduling the event job") + print(str(e)) @classmethod def process_request(cls, request): @@ -156,40 +220,24 @@ def process_response(self, request, response): # Mask Event Model event_model = self.logger_helper.mask_event(event_model, self.middleware_settings, self.DEBUG) - def sending_event(): - if self.DEBUG: - print("sending event to moesif") - try: - event_api_response = self.api_client.create_event(event_model) - event_response_config_etag = event_api_response.get("X-Moesif-Config-ETag") - - if event_response_config_etag is not None \ - and self.config_etag is not None \ - and self.config_etag != event_response_config_etag \ - and datetime.utcnow() > self.last_updated_time + timedelta(minutes=5): - try: - self.config = self.app_config.get_config(self.api_client, self.DEBUG) - self.config_etag, self.sampling_percentage, self.last_updated_time = self.app_config.parse_configuration(self.config, self.DEBUG) - except: - if self.DEBUG: - print('Error while updating the application configuration') - if self.DEBUG: - print("Event sent successfully") - except APIException as inst: - if 401 <= inst.response_code <= 403: - print("Unauthorized access sending event to Moesif. Please check your Appplication Id.") - if self.DEBUG: - print("Error sending event to Moesif, with status code:") - print(inst.response_code) - + # Create random percentage random_percentage = random.random() * 100 - self.sampling_percentage = self.app_config.get_sampling_percentage(self.config, user_id, company_id) if self.sampling_percentage >= random_percentage: event_model.weight = 1 if self.sampling_percentage == 0 else math.floor(100 / self.sampling_percentage) - # send the event to moesif via background so not blocking - sending_background_thread = threading.Thread(target=sending_event) - sending_background_thread.start() + try: + if self.is_event_job_scheduled: + if self.DEBUG: + print("Add Event to the queue") + self.mo_events_queue.put(event_model) + else: + self.schedule_event_background_job() + self.is_event_job_scheduled = True + except Exception as ex: + if self.DEBUG: + print("Error while adding event to the queue") + print(str(ex)) + self.is_event_job_scheduled = False return response diff --git a/moesifdjango/tasks.py b/moesifdjango/tasks.py index 117bb85..2777368 100644 --- a/moesifdjango/tasks.py +++ b/moesifdjango/tasks.py @@ -12,32 +12,16 @@ from celery import shared_task from moesifapi.moesif_api_client import MoesifAPIClient from moesifapi.models import EventModel -from .app_config import AppConfig -from datetime import datetime, timedelta +# Initialization middleware_settings = settings.MOESIF_MIDDLEWARE client = MoesifAPIClient(middleware_settings.get('APPLICATION_ID')) BATCH_SIZE = settings.MOESIF_MIDDLEWARE.get('BATCH_SIZE', 25) DEBUG = middleware_settings.get('LOCAL_DEBUG', False) - api_client = client.api response_catcher = HttpResponseCatcher() api_client.http_call_back = response_catcher -def get_config(): - app_config = AppConfig() - config = app_config.get_config(api_client, DEBUG) - sampling_percentage = 100 - config_etag = None - last_updated_time = datetime.utcnow() - try: - if config: - config_etag, sampling_percentage, last_updated_time = app_config.parse_configuration(config, DEBUG) - except: - if DEBUG: - print('Error while parsing application configuration on initialization') - return config, config_etag, sampling_percentage, last_updated_time - try: get_broker_url = settings.BROKER_URL if get_broker_url: @@ -60,7 +44,7 @@ def queue_get_all(queue_events): return events @shared_task(ignore_result=True) -def async_client_create_event(moesif_events_queue, config, config_etag, last_updated_time): +def async_client_create_event(moesif_events_queue): batch_events = [] try: queue_size = moesif_events_queue.qsize() @@ -78,29 +62,20 @@ def async_client_create_event(moesif_events_queue, config, config_etag, last_upd print("No message to read from the queue") if batch_events: - if DEBUG: - print("Sending events to Moesif") - batch_events_api_response = api_client.create_events_batch(batch_events) - batch_events_response_config_etag = batch_events_api_response.get("X-Moesif-Config-ETag") - if batch_events_response_config_etag is not None \ - and config_etag is not None \ - and config_etag != batch_events_response_config_etag \ - and datetime.utcnow() > last_updated_time + timedelta(minutes=5): - - try: - config = config.get_config(api_client, DEBUG) - config_etag, sampling_percentage, last_updated_time = config.parse_configuration( - config, DEBUG) - except: - if DEBUG: - print('Error while updating the application configuration') - if DEBUG: - print("Events sent successfully") + try: + if DEBUG: + print("Sending events to Moesif") + api_client.create_events_batch(batch_events) + if DEBUG: + print("Events sent successfully") + except Exception as ex: + if DEBUG: + print("Error sending events to Moesif") + print(str(ex)) else: if DEBUG: print("No events to send") - def exit_handler(moesif_events_queue, scheduler): try: # Close the queue @@ -124,18 +99,18 @@ def exit_handler(moesif_events_queue, scheduler): scheduler = BackgroundScheduler(daemon=True) scheduler.start() - config, config_etag, sampling_percentage, last_updated_time = get_config() try: conn = Connection(BROKER_URL) moesif_events_queue = conn.SimpleQueue('moesif_events_queue') scheduler.add_job( - func=lambda: async_client_create_event(moesif_events_queue, config, config_etag, last_updated_time), + func=lambda: async_client_create_event(moesif_events_queue), trigger=IntervalTrigger(seconds=5), id='moesif_events_batch_job', name='Schedule events batch job every 5 second', replace_existing=True) # Avoid passing logging message to the ancestor loggers + logging.getLogger('apscheduler.executors.default').setLevel(logging.WARNING) logging.getLogger('apscheduler.executors.default').propagate = False # Exit handler when exiting the app diff --git a/setup.py b/setup.py index 88b04f0..e6e699e 100644 --- a/setup.py +++ b/setup.py @@ -28,7 +28,7 @@ # Versions should comply with PEP440. For a discussion on single-sourcing # the version across setup.py and the project code, see # https://packaging.python.org/en/latest/single_source_version.html - version='1.7.8', + version='1.7.9', description='Moesif Middleware for Python Django', long_description=long_description, From bc4c68084ce28ce0fe64c1440a35292906cd273c Mon Sep 17 00:00:00 2001 From: Keyur Date: Tue, 16 Feb 2021 22:01:50 -0800 Subject: [PATCH 2/3] Remove: Celery support Remove: Celery support Add: Event Queue size config option Refactor: Update moesifapi to 1.3.3 Refactor: Update README.md Bump version to 2.0.0 --- README.md | 61 ++++------- moesifdjango/job_scheduler.py | 29 ++++-- moesifdjango/middleware.py | 172 ++++++++----------------------- moesifdjango/middleware_pre19.py | 62 ++++++----- moesifdjango/tasks.py | 126 ---------------------- requirements.txt | 2 +- setup.py | 2 +- 7 files changed, 117 insertions(+), 337 deletions(-) delete mode 100644 moesifdjango/tasks.py diff --git a/README.md b/README.md index a609272..0774be6 100755 --- a/README.md +++ b/README.md @@ -13,9 +13,6 @@ going out to third parties and sends to [Moesif](https://www.moesif.com) for API This SDK uses the Requests library and will work for Python 2.7 — 3.5. -For high volume APIs, you can [enable Celery](#use_celery) which offloads the logging in a separate task. -Install celery via `pip install celery` and set `USE_CELERY` to True - ## How to install ```shell @@ -26,7 +23,6 @@ pip install moesifdjango In your `settings.py` file in your Django project directory, please add `moesifdjango.middleware.moesif_middleware` to the MIDDLEWARE array. -If you plan to use celery as the backend of asynchronous delivered logged requests, you also need to add `moesifdjango` to your `INSTALLED_APPS`. Because of middleware execution order, it is best to add moesifdjango middleware __below__ SessionMiddleware and AuthenticationMiddleware, because they add useful session data that enables deeper error analysis. On the other hand, if you have other middleware that modified response before going out, you may choose to place Moesif middleware __above__ the middleware modifying response. This allows Moesif to see the modifications to the response data and see closer to what is going over the wire. @@ -87,7 +83,7 @@ After signing up for a Moesif account, your Moesif Application Id will be displa You can always find your Moesif Application Id at any time by logging into the [_Moesif Portal_](https://www.moesif.com/), click on the top right menu, -and then clicking _Installation_. +and then clicking _API Keys_. ## Configuration options @@ -121,6 +117,9 @@ to add custom metadata that will be associated with the event. The metadata must #### __`BATCH_SIZE`__ (optional) __int__, default 25, Maximum batch size when sending to Moesif. +#### __`EVENT_QUEUE_SIZE`__ +(optional) __int__, default 10000, Maximum number of events to hold in queue before sending to Moesif. In case of network issues when not able to connect/send event to Moesif, skips adding new to event to queue to prevent memory overflow. + #### __`AUTHORIZATION_HEADER_NAME`__ (optional) _string_, A request header field name used to identify the User in Moesif. Default value is `authorization`. Also, supports a comma separated string. We will check headers in order like `"X-Api-Key,Authorization"`. @@ -161,28 +160,6 @@ but different frameworks and your implementation might be very different, it wou ##### __`LOG_BODY_OUTGOING`__ (optional) _boolean_, default True, Set to False to remove logging request and response body. -#### __`USE_CELERY`__ -_boolean_, Default False. Set to True to use Celery for queuing sending data to Moesif. Check out [Celery documentation](http://docs.celeryproject.org) for more info. - -##### How to use Celery - -__Because celery is optional, moesifdjango does not prepackage Celery as a dependency. -Make sure you install celery via `pip install celery`__ - -Install celery and redis with `pip install "celery[redis]"` - -*Please Note:* If you're using Celery 3.1 or earlier, install celery and redis with `pip install celery==3.1.25` and `pip install redis==2.10.6` - -Set the configuration option to `USE_CELERY` to `True`. - -```python -MOESIF_MIDDLEWARE = { - 'USE_CELERY': True -} -``` - -Start the celery worker with `celery -A worker --loglevel=debug` - #### __`LOCAL_DEBUG`__ _boolean_, set to True to print internal log messages for debugging SDK integration issues. @@ -234,7 +211,6 @@ MOESIF_MIDDLEWARE = { 'SKIP': should_skip, 'MASK_EVENT_MODEL': mask_event, 'GET_METADATA': get_metadata, - 'USE_CELERY': False } ``` @@ -409,27 +385,26 @@ update_companies = middleware.update_companies_batch([userA, userB]) ## Tested versions -Moesif has validated moesifdjango against the following combinations. -Using the Celery queing service is optional, but can be enabled to enable higher performance. - -| Python | Django | Celery | Redis | Test with Celery | Test w/o Celery | -| ------------ | ------- | ------ | ------ | ---------------- | --------------- | -| Python 2.7 | 1.11.22 | 3.1.25 | 2.10.6 | Yes | Yes | -| Python 2.7 | 1.11.22 | 4.3.0 | 3.2.1 | Yes | Yes | -| Python 2.7 | 1.9 | | | | Yes | -| Python 3.4.5 | 1.11.22 | 3.1.25 | 2.10.6 | | Yes | -| Python 3.4.5 | 1.11.22 | 4.3.0 | 3.2.1 | | Yes | -| Python 3.4.5 | 1.9 | | | Yes | | -| Python 3.6.4 | 1.11.22 | 3.1.25 | 2.10.6 | Yes | Yes | -| Python 3.6.4 | 1.11.22 | 4.3.0 | 3.2.1 | Yes | Yes | -| Python 3.6.4 | 1.9 | | | | Yes | +Moesif has validated moesifdjango against the following combinations. + +| Python | Django | +| ------------ | ------- | +| Python 2.7 | 1.11.22 | +| Python 2.7 | 1.11.22 | +| Python 2.7 | 1.9 | +| Python 3.4.5 | 1.11.22 | +| Python 3.4.5 | 1.11.22 | +| Python 3.4.5 | 1.9 | +| Python 3.6.4 | 1.11.22 | +| Python 3.6.4 | 1.11.22 | +| Python 3.6.4 | 1.9 | ## How to test 1. Manually clone the git repo 2. Invoke `pip install Django` if you haven't done so. 3. Invoke `pip install moesifdjango` -3. Add your own application id to 'tests/settings.py'. You can find your Application Id from [_Moesif Dashboard_](https://www.moesif.com/) -> _Top Right Menu_ -> _Installation_ +3. Add your own application id to 'tests/settings.py'. You can find your Application Id from [_Moesif Dashboard_](https://www.moesif.com/) -> _Top Right Menu_ -> _API Keys_ 4. From terminal/cmd navigate to the root directory of the middleware tests. 5. Invoke `python manage.py test` if you are using Django 1.10 or newer. 6. Invoke `python manage.py test middleware_pre19_tests` if you are using Django 1.9 or older. diff --git a/moesifdjango/job_scheduler.py b/moesifdjango/job_scheduler.py index d72d546..284db79 100644 --- a/moesifdjango/job_scheduler.py +++ b/moesifdjango/job_scheduler.py @@ -1,8 +1,10 @@ +from datetime import datetime +from .app_config import AppConfig class JobScheduler: def __init__(self): - pass + self.app_config = AppConfig() @classmethod def exit_handler(cls, scheduler, debug): @@ -13,8 +15,7 @@ def exit_handler(cls, scheduler, debug): if debug: print("Error while closing the queue or scheduler shut down") - @classmethod - def send_events(cls, api_client, batch_events, debug): + def send_events(self, api_client, batch_events, debug, last_event_sent_time): try: if debug: print("Sending events to Moesif") @@ -23,15 +24,29 @@ def send_events(cls, api_client, batch_events, debug): print("Events sent successfully") # Fetch Config ETag from response header batch_events_response_config_etag = batch_events_api_response.get("X-Moesif-Config-ETag") + # Set the last time event sent + last_event_sent_time = datetime.utcnow() # Return Config Etag - return batch_events_response_config_etag + return batch_events_response_config_etag, last_event_sent_time except Exception as ex: if debug: print("Error sending event to Moesif") print(str(ex)) - return None + return None, last_event_sent_time + + # Function to fetch application config + def fetch_app_config(self, config, config_etag, sampling_percentage, last_updated_time, api_client, debug): + try: + config = self.app_config.get_config(api_client, debug) + if config: + config_etag, sampling_percentage, last_updated_time = self.app_config.parse_configuration(config, debug) + except Exception as e: + if debug: + print('Error while fetching the application configuration') + print(str(e)) + return config, config_etag, sampling_percentage, last_updated_time - def batch_events(self, api_client, moesif_events_queue, debug, batch_size): + def batch_events(self, api_client, moesif_events_queue, debug, batch_size, last_event_sent_time): batch_events = [] try: while not moesif_events_queue.empty(): @@ -40,7 +55,7 @@ def batch_events(self, api_client, moesif_events_queue, debug, batch_size): break if batch_events: - batch_response = self.send_events(api_client, batch_events, debug) + batch_response = self.send_events(api_client, batch_events, debug, last_event_sent_time) batch_events[:] = [] return batch_response else: diff --git a/moesifdjango/middleware.py b/moesifdjango/middleware.py index 11315c5..ab7e2f2 100644 --- a/moesifdjango/middleware.py +++ b/moesifdjango/middleware.py @@ -30,37 +30,10 @@ from .logger_helper import LoggerHelper from .event_mapper import EventMapper from .job_scheduler import JobScheduler - -CELERY = False -if settings.MOESIF_MIDDLEWARE.get('USE_CELERY', False): - try: - import celery - from .tasks import async_client_create_event - from kombu import Connection - try: - BROKER_URL = settings.BROKER_URL - if BROKER_URL: - CELERY = True - else: - CELERY = False - except AttributeError: - BROKER_URL = settings.MOESIF_MIDDLEWARE.get('CELERY_BROKER_URL', None) - if BROKER_URL: - CELERY = True - else: - print("USE_CELERY flag was set to TRUE, but BROKER_URL not found") - CELERY = False - - try: - conn = Connection(BROKER_URL) - simple_queue = conn.SimpleQueue('moesif_events_queue') - except: - print("Error while connecting to - {0}".format(BROKER_URL)) - - except: - print("USE_CELERY flag was set to TRUE, but celery package not found.") - CELERY = False - +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.triggers.interval import IntervalTrigger +from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED +import atexit class moesif_middleware: def __init__(self, get_response): @@ -73,6 +46,7 @@ def __init__(self, get_response): # below comment for setting moesif base_uri to a test server. if self.middleware_settings.get('LOCAL_DEBUG', False): Configuration.BASE_URI = self.middleware_settings.get('LOCAL_MOESIF_BASEURL', 'https://api.moesif.net') + Configuration.version = 'moesifdjango-python/2.0.0' if settings.MOESIF_MIDDLEWARE.get('CAPTURE_OUTGOING_REQUESTS', False): try: if self.DEBUG: @@ -97,10 +71,12 @@ def __init__(self, get_response): self.sampling_percentage = 100 self.config_etag = None self.last_updated_time = datetime.utcnow() - self.mo_events_queue = queue.Queue() + self.last_event_sent_time = datetime.utcnow() + self.scheduler = BackgroundScheduler(daemon=True) + self.event_queue_size = self.middleware_settings.get('EVENT_QUEUE_SIZE', 10000) + self.mo_events_queue = queue.Queue(maxsize=self.event_queue_size) self.event_batch_size = self.middleware_settings.get('BATCH_SIZE', 25) self.is_event_job_scheduled = False - self.is_config_job_scheduled = False try: if self.config: self.config_etag, self.sampling_percentage, self.last_updated_time = self.app_config.parse_configuration(self.config, self.DEBUG) @@ -109,9 +85,8 @@ def __init__(self, get_response): print('Error while parsing application configuration on initialization') print(str(e)) try: - if not CELERY: - self.schedule_event_background_job() - self.is_event_job_scheduled = True + self.schedule_event_background_job() + self.is_event_job_scheduled = True except Exception as ex: self.is_event_job_scheduled = False if self.DEBUG: @@ -125,12 +100,15 @@ def event_listener(self, event): print('Error reading response from the scheduled event job') else: if event.retval: - if event.retval is not None \ + response_etag, self.last_event_sent_time = event.retval + if response_etag is not None \ and self.config_etag is not None \ - and self.config_etag != event.retval \ + and self.config_etag != response_etag \ and datetime.utcnow() > self.last_updated_time + timedelta(minutes=5): try: - self.fetch_app_config() + self.config, self.config_etag, self.sampling_percentage, self.last_updated_time = \ + self.job_scheduler.fetch_app_config(self.config, self.config_etag, self.sampling_percentage, + self.last_updated_time, self.api_client, self.DEBUG) except Exception as ex: if self.DEBUG: print('Error while updating the application configuration') @@ -139,74 +117,30 @@ def event_listener(self, event): # Function to schedule send event job in async def schedule_event_background_job(self): try: - from apscheduler.schedulers.background import BackgroundScheduler - from apscheduler.triggers.interval import IntervalTrigger - from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED - import atexit - - scheduler = BackgroundScheduler(daemon=True) - scheduler.add_listener(self.event_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR) - scheduler.start() - try: - scheduler.add_job( - func=lambda: self.job_scheduler.batch_events(self.api_client, self.mo_events_queue, self.DEBUG, self.event_batch_size), + if not self.scheduler.get_jobs(): + + self.scheduler.add_listener(self.event_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR) + self.scheduler.start() + self.scheduler.add_job( + func=lambda: self.job_scheduler.batch_events(self.api_client, self.mo_events_queue, self.DEBUG, + self.event_batch_size, self.last_event_sent_time), trigger=IntervalTrigger(seconds=2), id='moesif_events_batch_job', name='Schedule events batch job every 2 second', replace_existing=True) - # Exit handler when exiting the app - atexit.register(lambda: self.job_scheduler.exit_handler(scheduler, self.DEBUG)) - except Exception as ex: - if self.DEBUG: - print("Error while calling async function") - print(str(ex)) - except Exception as e: - if self.DEBUG: - print("Error when scheduling the job") - print(str(e)) - - # Function to fetch application config - def fetch_app_config(self): - try: - self.config = self.app_config.get_config(self.api_client, self.DEBUG) - if self.config: - self.config_etag, self.sampling_percentage, self.last_updated_time = self.app_config.parse_configuration(self.config, self.DEBUG) - except Exception as e: - if self.DEBUG: - print('Error while fetching the application configuration') - print(str(e)) - - def schedule_app_config_job(self): - try: - from apscheduler.schedulers.background import BackgroundScheduler - from apscheduler.triggers.interval import IntervalTrigger - import atexit - - scheduler = BackgroundScheduler(daemon=True) - scheduler.start() - try: - scheduler.add_job( - func=lambda: self.fetch_app_config(), - trigger=IntervalTrigger(minutes=5), - id='moesif_app_config_job', - name='Schedule app config job every 5 minutes', - replace_existing=True) - # Avoid passing logging message to the ancestor loggers logging.getLogger('apscheduler.executors.default').setLevel(logging.WARNING) logging.getLogger('apscheduler.executors.default').propagate = False # Exit handler when exiting the app - atexit.register(lambda: self.job_scheduler.exit_handler(scheduler, self.DEBUG)) - except Exception as ex: - if self.DEBUG: - print("Error while calling app config async function") - print(str(ex)) - except Exception as e: + atexit.register(lambda: self.job_scheduler.exit_handler(self.scheduler, self.DEBUG)) + else: + self.last_event_sent_time = datetime.utcnow() + except Exception as ex: if self.DEBUG: - print("Error when scheduling the app config job") - print(str(e)) + print("Error when scheduling the job") + print(str(ex)) def __call__(self, request): # Code to be executed for each request before @@ -292,47 +226,23 @@ def __call__(self, request): # Mask Event Model event_model = self.logger_helper.mask_event(event_model, self.middleware_settings, self.DEBUG) - def sending_event(): - try: - message = event_model.to_dictionary() - simple_queue.put(message) - if self.DEBUG: - print("Event added to the queue") - except Exception as exc: - if self.DEBUG: - print("Error while adding event to the queue") - print(str(exc)) - # Create random percentage random_percentage = random.random() * 100 self.sampling_percentage = self.app_config.get_sampling_percentage(self.config, user_id, company_id) if self.sampling_percentage >= random_percentage: event_model.weight = 1 if self.sampling_percentage == 0 else math.floor(100 / self.sampling_percentage) - if CELERY: - sending_event() - try: - if not self.is_config_job_scheduled: - self.schedule_app_config_job() - self.is_config_job_scheduled = True - except Exception as e: - if self.DEBUG: - print('Error while starting the app config scheduler job in background') - print(str(e)) - self.is_config_job_scheduled = False - else: - try: - if self.is_event_job_scheduled: - if self.DEBUG: - print("Add Event to the queue") - self.mo_events_queue.put(event_model) - else: - self.schedule_event_background_job() - self.is_event_job_scheduled = True - except Exception as ex: + try: + if self.is_event_job_scheduled and datetime.utcnow() < self.last_event_sent_time + timedelta(minutes=5): if self.DEBUG: - print("Error while adding event to the queue") - print(str(ex)) - self.is_event_job_scheduled = False + print("Add Event to the queue") + self.mo_events_queue.put(event_model) + else: + self.schedule_event_background_job() + self.is_event_job_scheduled = True + except Exception as ex: + if self.DEBUG: + print("Error while adding event to the queue") + print(str(ex)) return response diff --git a/moesifdjango/middleware_pre19.py b/moesifdjango/middleware_pre19.py index 2b6febe..e9d8b2d 100644 --- a/moesifdjango/middleware_pre19.py +++ b/moesifdjango/middleware_pre19.py @@ -5,6 +5,7 @@ import random import math import queue +import logging from django.conf import settings from django.utils import timezone from moesifapi.moesif_api_client import * @@ -28,6 +29,10 @@ from .logger_helper import LoggerHelper from .event_mapper import EventMapper from .job_scheduler import JobScheduler +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.triggers.interval import IntervalTrigger +from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED +import atexit class MoesifMiddlewarePre19(object): @@ -40,6 +45,7 @@ def __init__(self): # below comment for setting moesif base_uri to a test server. if self.middleware_settings.get('LOCAL_DEBUG', False): Configuration.BASE_URI = self.middleware_settings.get('LOCAL_MOESIF_BASEURL', 'https://api.moesif.net') + Configuration.version = 'moesifdjango-python/2.0.0' if settings.MOESIF_MIDDLEWARE.get('CAPTURE_OUTGOING_REQUESTS', False): try: if self.DEBUG: @@ -65,7 +71,10 @@ def __init__(self): self.sampling_percentage = 100 self.config_etag = None self.last_updated_time = datetime.utcnow() - self.mo_events_queue = queue.Queue() + self.last_event_sent_time = datetime.utcnow() + self.scheduler = BackgroundScheduler(daemon=True) + self.event_queue_size = self.middleware_settings.get('EVENT_QUEUE_SIZE', 10000) + self.mo_events_queue = queue.Queue(maxsize=self.event_queue_size) self.event_batch_size = self.middleware_settings.get('BATCH_SIZE', 25) self.is_event_job_scheduled = False try: @@ -89,17 +98,18 @@ def __init__(self): def event_listener(self, event): if event.exception: if self.DEBUG: - print('Error reading response from the event scheduled job') + print('Error reading response from the scheduled event job') else: if event.retval: - if event.retval is not None \ + response_etag, self.last_event_sent_time = event.retval + if response_etag is not None \ and self.config_etag is not None \ - and self.config_etag != event.retval \ + and self.config_etag != response_etag \ and datetime.utcnow() > self.last_updated_time + timedelta(minutes=5): try: - self.config = self.app_config.get_config(self.api_client, self.DEBUG) - self.config_etag, self.sampling_percentage, self.last_updated_time = self.app_config.parse_configuration( - self.config, self.DEBUG) + self.config, self.config_etag, self.sampling_percentage, self.last_updated_time = \ + self.job_scheduler.fetch_app_config(self.config, self.config_etag, self.sampling_percentage, + self.last_updated_time, self.api_client, self.DEBUG) except Exception as ex: if self.DEBUG: print('Error while updating the application configuration') @@ -108,32 +118,29 @@ def event_listener(self, event): # Function to schedule send event job in async def schedule_event_background_job(self): try: - from apscheduler.schedulers.background import BackgroundScheduler - from apscheduler.triggers.interval import IntervalTrigger - from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED - import atexit - - scheduler = BackgroundScheduler(daemon=True) - scheduler.add_listener(self.event_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR) - scheduler.start() - try: - scheduler.add_job( - func=lambda: self.job_scheduler.batch_events(self.api_client, self.mo_events_queue, self.DEBUG, self.event_batch_size), + if not self.scheduler.get_jobs(): + self.scheduler.add_listener(self.event_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR) + self.scheduler.start() + self.scheduler.add_job( + func=lambda: self.job_scheduler.batch_events(self.api_client, self.mo_events_queue, self.DEBUG, + self.event_batch_size, self.last_event_sent_time), trigger=IntervalTrigger(seconds=2), id='moesif_events_batch_job', name='Schedule events batch job every 2 second', replace_existing=True) + # Avoid passing logging message to the ancestor loggers + logging.getLogger('apscheduler.executors.default').setLevel(logging.WARNING) + logging.getLogger('apscheduler.executors.default').propagate = False + # Exit handler when exiting the app - atexit.register(lambda: self.job_scheduler.exit_handler(scheduler, self.DEBUG)) - except Exception as ex: - if self.DEBUG: - print("Error while calling async function") - print(str(ex)) - except Exception as e: + atexit.register(lambda: self.job_scheduler.exit_handler(self.scheduler, self.DEBUG)) + else: + self.last_event_sent_time = datetime.utcnow() + except Exception as ex: if self.DEBUG: - print("Error when scheduling the event job") - print(str(e)) + print("Error when scheduling the job") + print(str(ex)) @classmethod def process_request(cls, request): @@ -226,7 +233,7 @@ def process_response(self, request, response): if self.sampling_percentage >= random_percentage: event_model.weight = 1 if self.sampling_percentage == 0 else math.floor(100 / self.sampling_percentage) try: - if self.is_event_job_scheduled: + if self.is_event_job_scheduled and datetime.utcnow() < self.last_event_sent_time + timedelta(minutes=5): if self.DEBUG: print("Add Event to the queue") self.mo_events_queue.put(event_model) @@ -237,7 +244,6 @@ def process_response(self, request, response): if self.DEBUG: print("Error while adding event to the queue") print(str(ex)) - self.is_event_job_scheduled = False return response diff --git a/moesifdjango/tasks.py b/moesifdjango/tasks.py deleted file mode 100644 index 2777368..0000000 --- a/moesifdjango/tasks.py +++ /dev/null @@ -1,126 +0,0 @@ -""" -__author__ = 'mpetyx (Michael Petychakis)' -__version__ = "1.0.0" -__maintainer__ = "Michael Petychakis" -__email__ = "michael@orfium.com" -__status__ = "Production" -""" - -from __future__ import absolute_import, unicode_literals -from django.conf import settings -from .http_response_catcher import HttpResponseCatcher -from celery import shared_task -from moesifapi.moesif_api_client import MoesifAPIClient -from moesifapi.models import EventModel - -# Initialization -middleware_settings = settings.MOESIF_MIDDLEWARE -client = MoesifAPIClient(middleware_settings.get('APPLICATION_ID')) -BATCH_SIZE = settings.MOESIF_MIDDLEWARE.get('BATCH_SIZE', 25) -DEBUG = middleware_settings.get('LOCAL_DEBUG', False) -api_client = client.api -response_catcher = HttpResponseCatcher() -api_client.http_call_back = response_catcher - -try: - get_broker_url = settings.BROKER_URL - if get_broker_url: - BROKER_URL = get_broker_url - else: - BROKER_URL = None -except AttributeError: - BROKER_URL = settings.MOESIF_MIDDLEWARE.get('CELERY_BROKER_URL', None) - - -def queue_get_all(queue_events): - events = [] - for num_of_events_retrieved in range(0, BATCH_SIZE): - try: - if num_of_events_retrieved == BATCH_SIZE: - break - events.append(queue_events.get_nowait()) - except: - break - return events - -@shared_task(ignore_result=True) -def async_client_create_event(moesif_events_queue): - batch_events = [] - try: - queue_size = moesif_events_queue.qsize() - while queue_size > 0: - message = queue_get_all(moesif_events_queue) - for event in message: - batch_events.append(EventModel().from_dictionary(event.payload)) - event.ack() - try: - queue_size = moesif_events_queue.qsize() - except ChannelError: - queue_size = 0 - except ChannelError: - if DEBUG: - print("No message to read from the queue") - - if batch_events: - try: - if DEBUG: - print("Sending events to Moesif") - api_client.create_events_batch(batch_events) - if DEBUG: - print("Events sent successfully") - except Exception as ex: - if DEBUG: - print("Error sending events to Moesif") - print(str(ex)) - else: - if DEBUG: - print("No events to send") - -def exit_handler(moesif_events_queue, scheduler): - try: - # Close the queue - moesif_events_queue.close() - # Shut down the scheduler - scheduler.shutdown() - except: - if DEBUG: - print("Error while closing the queue or scheduler shut down") - -CELERY = False -if settings.MOESIF_MIDDLEWARE.get('USE_CELERY', False): - if BROKER_URL: - try: - from apscheduler.schedulers.background import BackgroundScheduler - from apscheduler.triggers.interval import IntervalTrigger - from kombu import Connection - from kombu.exceptions import ChannelError - import atexit - import logging - - scheduler = BackgroundScheduler(daemon=True) - scheduler.start() - try: - conn = Connection(BROKER_URL) - moesif_events_queue = conn.SimpleQueue('moesif_events_queue') - scheduler.add_job( - func=lambda: async_client_create_event(moesif_events_queue), - trigger=IntervalTrigger(seconds=5), - id='moesif_events_batch_job', - name='Schedule events batch job every 5 second', - replace_existing=True) - - # Avoid passing logging message to the ancestor loggers - logging.getLogger('apscheduler.executors.default').setLevel(logging.WARNING) - logging.getLogger('apscheduler.executors.default').propagate = False - - # Exit handler when exiting the app - atexit.register(lambda: exit_handler(moesif_events_queue, scheduler)) - except: - if DEBUG: - print("Error while connecting to - {0}".format(BROKER_URL)) - except: - if DEBUG: - print("Error when scheduling the job") - else: - if DEBUG: - print("Unable to schedule the job as the BROKER_URL is not provided") diff --git a/requirements.txt b/requirements.txt index 5f2aa75..7c76734 100755 --- a/requirements.txt +++ b/requirements.txt @@ -3,7 +3,7 @@ jsonpickle==0.7.1 python-dateutil==2.5.3 nose==1.3.7 isodatetimehandler==1.0.2 -moesifapi==1.3.2 +moesifapi==1.3.3 celery>=3.1.25 moesifpythonrequest==0.2.0 apscheduler==3.6.1 diff --git a/setup.py b/setup.py index e6e699e..8acfb2a 100644 --- a/setup.py +++ b/setup.py @@ -28,7 +28,7 @@ # Versions should comply with PEP440. For a discussion on single-sourcing # the version across setup.py and the project code, see # https://packaging.python.org/en/latest/single_source_version.html - version='1.7.9', + version='2.0.0', description='Moesif Middleware for Python Django', long_description=long_description, From e71d1d27aa4a13fd727161f174ac03548f44691d Mon Sep 17 00:00:00 2001 From: Keyur Date: Wed, 17 Feb 2021 12:27:41 -0800 Subject: [PATCH 3/3] Refactor: Last Job run time Refactor: Last Job run time --- moesifdjango/job_scheduler.py | 19 +++++++++++-------- moesifdjango/middleware.py | 10 +++++----- moesifdjango/middleware_pre19.py | 10 +++++----- 3 files changed, 21 insertions(+), 18 deletions(-) diff --git a/moesifdjango/job_scheduler.py b/moesifdjango/job_scheduler.py index 284db79..befe961 100644 --- a/moesifdjango/job_scheduler.py +++ b/moesifdjango/job_scheduler.py @@ -15,7 +15,7 @@ def exit_handler(cls, scheduler, debug): if debug: print("Error while closing the queue or scheduler shut down") - def send_events(self, api_client, batch_events, debug, last_event_sent_time): + def send_events(self, api_client, batch_events, debug): try: if debug: print("Sending events to Moesif") @@ -24,15 +24,13 @@ def send_events(self, api_client, batch_events, debug, last_event_sent_time): print("Events sent successfully") # Fetch Config ETag from response header batch_events_response_config_etag = batch_events_api_response.get("X-Moesif-Config-ETag") - # Set the last time event sent - last_event_sent_time = datetime.utcnow() # Return Config Etag - return batch_events_response_config_etag, last_event_sent_time + return batch_events_response_config_etag except Exception as ex: if debug: print("Error sending event to Moesif") print(str(ex)) - return None, last_event_sent_time + return None # Function to fetch application config def fetch_app_config(self, config, config_etag, sampling_percentage, last_updated_time, api_client, debug): @@ -46,7 +44,10 @@ def fetch_app_config(self, config, config_etag, sampling_percentage, last_update print(str(e)) return config, config_etag, sampling_percentage, last_updated_time - def batch_events(self, api_client, moesif_events_queue, debug, batch_size, last_event_sent_time): + def batch_events(self, api_client, moesif_events_queue, debug, batch_size): + # Set the last time event job ran + last_event_job_run_time = datetime.utcnow() + batch_events = [] try: while not moesif_events_queue.empty(): @@ -55,12 +56,14 @@ def batch_events(self, api_client, moesif_events_queue, debug, batch_size, last_ break if batch_events: - batch_response = self.send_events(api_client, batch_events, debug, last_event_sent_time) + batch_response = self.send_events(api_client, batch_events, debug) batch_events[:] = [] - return batch_response + return batch_response, last_event_job_run_time else: if debug: print("No events to send") + return None, last_event_job_run_time except: if debug: print("No message to read from the queue") + return None, last_event_job_run_time diff --git a/moesifdjango/middleware.py b/moesifdjango/middleware.py index ab7e2f2..b7f6256 100644 --- a/moesifdjango/middleware.py +++ b/moesifdjango/middleware.py @@ -71,7 +71,7 @@ def __init__(self, get_response): self.sampling_percentage = 100 self.config_etag = None self.last_updated_time = datetime.utcnow() - self.last_event_sent_time = datetime.utcnow() + self.last_event_job_run_time = datetime.utcnow() self.scheduler = BackgroundScheduler(daemon=True) self.event_queue_size = self.middleware_settings.get('EVENT_QUEUE_SIZE', 10000) self.mo_events_queue = queue.Queue(maxsize=self.event_queue_size) @@ -100,7 +100,7 @@ def event_listener(self, event): print('Error reading response from the scheduled event job') else: if event.retval: - response_etag, self.last_event_sent_time = event.retval + response_etag, self.last_event_job_run_time = event.retval if response_etag is not None \ and self.config_etag is not None \ and self.config_etag != response_etag \ @@ -123,7 +123,7 @@ def schedule_event_background_job(self): self.scheduler.start() self.scheduler.add_job( func=lambda: self.job_scheduler.batch_events(self.api_client, self.mo_events_queue, self.DEBUG, - self.event_batch_size, self.last_event_sent_time), + self.event_batch_size), trigger=IntervalTrigger(seconds=2), id='moesif_events_batch_job', name='Schedule events batch job every 2 second', @@ -136,7 +136,7 @@ def schedule_event_background_job(self): # Exit handler when exiting the app atexit.register(lambda: self.job_scheduler.exit_handler(self.scheduler, self.DEBUG)) else: - self.last_event_sent_time = datetime.utcnow() + self.last_event_job_run_time = datetime.utcnow() except Exception as ex: if self.DEBUG: print("Error when scheduling the job") @@ -232,7 +232,7 @@ def __call__(self, request): if self.sampling_percentage >= random_percentage: event_model.weight = 1 if self.sampling_percentage == 0 else math.floor(100 / self.sampling_percentage) try: - if self.is_event_job_scheduled and datetime.utcnow() < self.last_event_sent_time + timedelta(minutes=5): + if self.is_event_job_scheduled and datetime.utcnow() < self.last_event_job_run_time + timedelta(minutes=5): if self.DEBUG: print("Add Event to the queue") self.mo_events_queue.put(event_model) diff --git a/moesifdjango/middleware_pre19.py b/moesifdjango/middleware_pre19.py index e9d8b2d..b46b706 100644 --- a/moesifdjango/middleware_pre19.py +++ b/moesifdjango/middleware_pre19.py @@ -71,7 +71,7 @@ def __init__(self): self.sampling_percentage = 100 self.config_etag = None self.last_updated_time = datetime.utcnow() - self.last_event_sent_time = datetime.utcnow() + self.last_event_job_run_time = datetime.utcnow() self.scheduler = BackgroundScheduler(daemon=True) self.event_queue_size = self.middleware_settings.get('EVENT_QUEUE_SIZE', 10000) self.mo_events_queue = queue.Queue(maxsize=self.event_queue_size) @@ -101,7 +101,7 @@ def event_listener(self, event): print('Error reading response from the scheduled event job') else: if event.retval: - response_etag, self.last_event_sent_time = event.retval + response_etag, self.last_event_job_run_time = event.retval if response_etag is not None \ and self.config_etag is not None \ and self.config_etag != response_etag \ @@ -123,7 +123,7 @@ def schedule_event_background_job(self): self.scheduler.start() self.scheduler.add_job( func=lambda: self.job_scheduler.batch_events(self.api_client, self.mo_events_queue, self.DEBUG, - self.event_batch_size, self.last_event_sent_time), + self.event_batch_size), trigger=IntervalTrigger(seconds=2), id='moesif_events_batch_job', name='Schedule events batch job every 2 second', @@ -136,7 +136,7 @@ def schedule_event_background_job(self): # Exit handler when exiting the app atexit.register(lambda: self.job_scheduler.exit_handler(self.scheduler, self.DEBUG)) else: - self.last_event_sent_time = datetime.utcnow() + self.last_event_job_run_time = datetime.utcnow() except Exception as ex: if self.DEBUG: print("Error when scheduling the job") @@ -233,7 +233,7 @@ def process_response(self, request, response): if self.sampling_percentage >= random_percentage: event_model.weight = 1 if self.sampling_percentage == 0 else math.floor(100 / self.sampling_percentage) try: - if self.is_event_job_scheduled and datetime.utcnow() < self.last_event_sent_time + timedelta(minutes=5): + if self.is_event_job_scheduled and datetime.utcnow() < self.last_event_job_run_time + timedelta(minutes=5): if self.DEBUG: print("Add Event to the queue") self.mo_events_queue.put(event_model)