Skip to content

Commit

Permalink
#237402304 fix scheduler restart issue (#77)
Browse files Browse the repository at this point in the history
* fix scheduler restart issue

* Refactored code and added comments.

* clean up and update middleware pre1.9

---------

Co-authored-by: Praveen Kumar <praveen@moesif.com>
  • Loading branch information
HzANut and praves77 committed Jun 30, 2023
1 parent d23c1a7 commit cfad9ce
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 31 deletions.
63 changes: 48 additions & 15 deletions moesifdjango/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import atexit
from .moesif_gov import *
from .parse_body import ParseBody
from apscheduler.schedulers.base import STATE_STOPPED


class moesif_middleware:
Expand Down Expand Up @@ -85,14 +86,11 @@ def __init__(self, get_response):
self.sampling_percentage = 100
self.config_etag = None
self.rules_etag = None

self.last_updated_time = datetime.utcnow()
self.last_event_job_run_time = datetime(1970, 1, 1, 0, 0) # Assuming job never ran, set it to epoch start time
self.scheduler = None
self._reset_scheduler()
self.event_queue_size = self.middleware_settings.get('EVENT_QUEUE_SIZE', 10000)
self.mo_events_queue = queue.Queue(maxsize=self.event_queue_size)
self.event_batch_size = self.middleware_settings.get('BATCH_SIZE', 25)
self.is_event_job_scheduled = False
try:
if self.config:
self.config_etag, self.sampling_percentage, self.last_updated_time = self.app_config.parse_configuration(
Expand Down Expand Up @@ -292,18 +290,11 @@ def __call__(self, request):
if self.sampling_percentage >= random_percentage:
event_model.weight = 1 if self.sampling_percentage == 0 else math.floor(100 / self.sampling_percentage)
try:
if not self.is_event_job_scheduled and datetime.utcnow() > self.last_event_job_run_time + timedelta(minutes=5):
try:
self.schedule_event_background_job()
self.is_event_job_scheduled = True
self.last_event_job_run_time = datetime.utcnow()
except Exception as ex:
self.is_event_job_scheduled = False
if self.DEBUG:
print('Error while starting the event scheduler job in background')
print(str(ex))
# Create scheduler if needed
self._create_scheduler_if_needed()

if self.DEBUG:
print("Add Event to the queue")
print("Add Event to the queue: ", self.mo_events_queue.qsize())
self.mo_events_queue.put(event_model)
except Exception as ex:
if self.DEBUG:
Expand All @@ -312,6 +303,48 @@ def __call__(self, request):

return response

def _reset_scheduler(self):
"""
Private Method to reset scheduler to original `init` (aka null) state.
"""
try:
# try to clean up before resetting
self.scheduler.remove_job('moesif_events_batch_job')
self.scheduler.shutdown()
except Exception as es:
# ignore because either schedule is null or job is null
# cleanup is not needed
pass
finally:
print("----- Event scheduler will start on next event.")
# Reset initialize it so that next time schedule job is called it gets created again.
self.scheduler = None
self.is_event_job_scheduled = False
self.last_event_job_run_time = datetime(1970, 1, 1, 0, 0) # Assuming job never ran, set it to epoch start time

def _create_scheduler_if_needed(self):
"""
Private method to create scheduler if:
1. first time OR
2. It exists but is in stopped state - so any subsequent add_job will throw exceptions.
"""
# Check if scheduler exist but in stopped state (for some reason stopped by app/diff middlewares)
# Reset it so that it can be restarted
if self.scheduler and self.scheduler.state == STATE_STOPPED:
self._reset_scheduler()

# Create scheduler if needed
if not self.is_event_job_scheduled and datetime.utcnow() > self.last_event_job_run_time + timedelta(minutes=5):
try:
self.schedule_event_background_job()
self.is_event_job_scheduled = True
self.last_event_job_run_time = datetime.utcnow()
except Exception as ex:
self.is_event_job_scheduled = False
if self.DEBUG:
print('Error while starting the event scheduler job in background')
print(str(ex))

def update_user(self, user_profile):
self.user.update_user(user_profile, self.api_client, self.DEBUG)

Expand Down
65 changes: 50 additions & 15 deletions moesifdjango/middleware_pre19.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED
import atexit
from apscheduler.schedulers.base import STATE_STOPPED

class MoesifMiddlewarePre19(object):

Expand Down Expand Up @@ -71,12 +72,11 @@ def __init__(self):
self.sampling_percentage = 100
self.config_etag = None
self.last_updated_time = datetime.utcnow()
self.last_event_job_run_time = datetime(1970, 1, 1, 0, 0) # Assuming job never ran, set it to epoch start time
self.scheduler = None
self._reset_scheduler()
self.event_queue_size = self.middleware_settings.get('EVENT_QUEUE_SIZE', 10000)
self.mo_events_queue = queue.Queue(maxsize=self.event_queue_size)
self.event_batch_size = self.middleware_settings.get('BATCH_SIZE', 25)
self.is_event_job_scheduled = False

try:
if self.config:
self.config_etag, self.sampling_percentage, self.last_updated_time = self.app_config.parse_configuration(
Expand Down Expand Up @@ -240,19 +240,10 @@ def process_response(self, request, response):
if self.sampling_percentage >= random_percentage:
event_model.weight = 1 if self.sampling_percentage == 0 else math.floor(100 / self.sampling_percentage)
try:
if not self.is_event_job_scheduled and datetime.utcnow() > self.last_event_job_run_time + timedelta(
minutes=5):
try:
self.schedule_event_background_job()
self.is_event_job_scheduled = True
self.last_event_job_run_time = datetime.utcnow()
except Exception as ex:
self.is_event_job_scheduled = False
if self.DEBUG:
print('Error while starting the event scheduler job in background')
print(str(ex))
# Create scheduler if needed
self._create_scheduler_if_needed()
if self.DEBUG:
print("Add Event to the queue")
print("Add Event to the queue: ", self.mo_events_queue.qsize())
self.mo_events_queue.put(event_model)
except Exception as ex:
if self.DEBUG:
Expand All @@ -261,6 +252,50 @@ def process_response(self, request, response):

return response

def _reset_scheduler(self):
"""
Private Method to reset scheduler to original `init` (aka null) state.
"""

try:
# try to clean up before resetting
self.scheduler.remove_job('moesif_events_batch_job')
self.scheduler.shutdown()
except Exception as es:
# ignore because either schedule is null or job is null
# cleanup is not needed
pass
finally:
print("----- Event scheduler will start on next event.")

# Reset initialize it so that next time schedule job is called it gets created again.
self.scheduler = None
self.is_event_job_scheduled = False
self.last_event_job_run_time = datetime(1970, 1, 1, 0, 0) # Assuming job never ran, set it to epoch start time

def _create_scheduler_if_needed(self):
"""
Private method to create scheduler if:
1. first time OR
2. It exists but is in stopped state - so any subsequent add_job will throw exceptions.
"""
# Check if scheduler exist but in stopped state (for some reason stopped by app/diff middlewares)
# Reset it so that it can be restarted
if self.scheduler and self.scheduler.state == STATE_STOPPED:
self._reset_scheduler()

# Create scheduler if needed
if not self.is_event_job_scheduled and datetime.utcnow() > self.last_event_job_run_time + timedelta(minutes=5):
try:
self.schedule_event_background_job()
self.is_event_job_scheduled = True
self.last_event_job_run_time = datetime.utcnow()
except Exception as ex:
self.is_event_job_scheduled = False
if self.DEBUG:
print('Error while starting the event scheduler job in background')
print(str(ex))

def update_user(self, user_profile):
self.user.update_user(user_profile, self.api_client, self.DEBUG)

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
# Versions should comply with PEP440. For a discussion on single-sourcing
# the version across setup.py and the project code, see
# https://packaging.python.org/en/latest/single_source_version.html
version='2.3.1',
version='2.3.2',

description='Moesif Middleware for Python Django',
long_description=long_description,
Expand Down

0 comments on commit cfad9ce

Please sign in to comment.