From 6a7d0da2bb793d7818bea437a4656751d2960e44 Mon Sep 17 00:00:00 2001 From: leffss <348926676@qq.com> Date: Tue, 2 Aug 2022 09:49:50 +0800 Subject: [PATCH] =?UTF-8?q?RedisMultiScheduler=E6=96=B0=E5=A2=9E=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1enable=E5=8F=82=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- devops/settings.py | 38 ++++++++++++++++++++++++++++++++---- redismultibeat/scheduler.py | 39 ++++++++++++++++++++++++++++++------- requirements.txt | 8 +++++--- test.py | 10 ++++++++++ 4 files changed, 81 insertions(+), 14 deletions(-) diff --git a/devops/settings.py b/devops/settings.py index 1a83068..c0dbff1 100644 --- a/devops/settings.py +++ b/devops/settings.py @@ -277,9 +277,9 @@ ... 2019-12-07 13:21:39,008 你会发现每次执行时间都会延迟 10-30 毫秒之间(程序执行逻辑耗费的时间),如果任务有严格时间要求,则不适合使用这种类型的任务 -cron 任务暂时没发现这个问题 -经过测试发现在 interval 任务种加入 "relative": True 后,windows 上面运行的celery可以解决此问题 -但是 linux 下运行的 celery 只有第一次任务的毫秒变成1-10的样子,后面还是会递增(使用 -P eventlet 一样) +cron 任务也发现这个问题 +经过测试发在任务中加入 "relative": True 后,在我的其他django项目中解决了这个问题,本项目还是有问题 +两个项目scheduler代码一样,只是依赖也不太相同,不知道是什么原因了 """ CELERY_BEAT_FLUSH_TASKS = False # 启动 beat 时是否清空已有任务 CELERY_TIMEZONE = TIME_ZONE # celery 使用的是 utc 时间,需要设置为 django 相同时区 @@ -293,33 +293,63 @@ # 'schedule': timedelta(seconds=30), # 任务循环时间 # # "args": None, # 参数 # "args": (None, 0, 3), # 参数,可迭代对象,元组或者列表 + # 'kwargs': {}, # task 任务函数参数 + # 'options': {}, # apply_async 函数附加参数 + # 'relative': True, + # 'limit_run_time': 0, + # 'enable': True, # }, # 'task_check_scheduler_cron': { # 'task': 'tasks.tasks.task_check_scheduler', # 'schedule': crontab(minute='*/1', hour='*', day_of_week='*', day_of_month='*', month_of_year='*'), # cron 任务 # # "args": None, # 参数 # "args": (None, 0, 3), # 参数,可迭代对象,元组或者列表 + # 'kwargs': {}, + # 'options': {}, + # 'relative': True, + # 'limit_run_time': 0, + # 'enable': True, # }, 'task_cls_terminalsession': { # 清除 terminalsession 表,系统异常退出时此表可能会有垃圾数据,仅启动时运行一次 'task': 'tasks.tasks.task_cls_terminalsession', 'schedule': timedelta(seconds=3), + 'args': None, + 'kwargs': {}, + 'options': {}, "relative": True, "limit_run_time": 1, # 限制任务执行次数,>=0, 默认 0 为不限制。注意:celery 原版 beat 是不支持此参数的 + 'enable': True, # 是否启用。注意:celery 原版 beat 是不支持此参数的 }, # 'task_cls_user_logs': { # 清除操作日志定时任务,如不自动清除,注释此任务即可 # 'task': 'tasks.tasks.task_cls_user_logs', # 'schedule': crontab(minute=5, hour=2), # "args": (USER_LOGS_KEEP_DAYS,), + # 'kwargs': {}, + # 'options': {}, + # 'relative': True, + # 'limit_run_time': 0, + # 'enable': True, # }, 'task_cls_terminal_logs': { # 清除终端日志定时任务,如不自动清除,注释此任务即可 'task': 'tasks.tasks.task_cls_terminal_logs', 'schedule': crontab(minute=10, hour=2), "args": (TERMINAL_LOGS_KEEP_DAYS,), + 'kwargs': {}, + 'options': {}, + "relative": True, + "limit_run_time": 0, + 'enable': True, }, 'task_cls_batch_logs': { # 清除批量日志定时任务,如不自动清除,注释此任务即可 'task': 'tasks.tasks.task_cls_batch_logs', 'schedule': crontab(minute=15, hour=2), - "args": [BATCH_LOGS_KEEP_DAYS], + # "args": [BATCH_LOGS_KEEP_DAYS], + 'args': None, + 'kwargs': {'keep_days': BATCH_LOGS_KEEP_DAYS}, + 'options': {}, + "relative": True, + "limit_run_time": 0, + 'enable': True, }, } diff --git a/redismultibeat/scheduler.py b/redismultibeat/scheduler.py index b244f22..488209e 100644 --- a/redismultibeat/scheduler.py +++ b/redismultibeat/scheduler.py @@ -10,13 +10,22 @@ "relative": True, # "args": None, # 参数 "args": (None, 0, 3), # 参数 - "limit_run_time": 5, # 限制运行次数 + 'kwargs': {}, + 'options': {}, + "relative": True, + "limit_run_time": 5, # 限制任务执行次数,>=0, 默认 0 为不限制。注意:celery 原版 beat 是不支持此参数的 + 'enable': True, # 是否启用。注意:celery 原版 beat 是不支持此参数的 }, 'task_check_scheduler_cron': { 'task': 'tasks.tasks.task_check_scheduler', 'schedule': crontab(minute='*/1', hour='*', day_of_week='*', day_of_month='*', month_of_year='*'), # cron 任务 # "args": None, # 参数 "args": (None, 0, 3), # 参数 + 'kwargs': {}, + 'options': {}, + "relative": True, + "limit_run_time": 0, + 'enable': True, } } @@ -36,6 +45,11 @@ 'task': 'tasks.tasks.task_check_scheduler', 'schedule': timedelta(seconds=7200), "args": (None, 1, 3), + 'kwargs': {}, + 'options': {}, + "relative": True, + "limit_run_time": 0, + 'enable': True, }) # 动态删除任务: @@ -47,7 +61,11 @@ 'task': 'tasks.tasks.task_check_scheduler', 'schedule': timedelta(seconds=1600), "args": (None, 1, 3), - "limit_run_time": 5, # 限制运行次数 + 'kwargs': {}, + 'options': {}, + "relative": True, + "limit_run_time": 0, + 'enable': False, }) manager.close() @@ -94,6 +112,7 @@ class CustomScheduleEntry(ScheduleEntry): total_run_count (int): 参考 celery 官方文档 relative (bool): 参考 celery 官方文档 limit_run_time (int): 限制任务执行次数,>=0, 0 为不限制 + enable (bool): 是否启用任务 """ limit_run_time = 0 @@ -218,20 +237,26 @@ def _tick(self): next_times.append(next_time_to_run) if is_due: try: - info("scheduler task entry: {} to publisher, total_run_count: {}, limit_run_time: {}".format(entry.name, entry.total_run_count + 1, entry.limit_run_time)) - result = self.apply_async(entry) # 添加任务到worker队列 + if entry.enable: + info("scheduler task entry: {} to publisher, total_run_count: {}, limit_run_time: {}".format(entry.name, entry.total_run_count + 1, entry.limit_run_time)) + result = self.apply_async(entry) # 添加任务到worker队列 + debug('%s sent. id->%s', entry.task, result.id) + else: + info( + "task entry disable: {}, total_run_count: {}, limit_run_time: {}".format( + entry.name, entry.total_run_count, entry.limit_run_time)) except Exception as exc: error('Message Error: %s\n%s', exc, traceback.format_stack(), exc_info=True) - else: - debug('%s sent. id->%s', entry.task, result.id) next_entry = self.reserve(entry) + if not entry.enable: + next_entry.total_run_count = next_entry.total_run_count - 1 pipe.zrem(self.key, task) # 删除旧的任务 if next_entry.limit_run_time == 0 or next_entry.total_run_count < next_entry.limit_run_time: # 将旧任务重新计算时间后再添加 pipe.zadd(self.key, {jsonpickle.encode(next_entry): self._when(next_entry, next_time_to_run, ) or 0}) else: - logger.info("task entry: {} limit to run {} times, stopped".format(entry.name, entry.limit_run_time)) + logger.info("task entry: {} limit to run {} times, total run {} times, stopped".format(entry.name, entry.limit_run_time, entry.total_run_count + 1)) pipe.execute() # 获取最近一个需要执行的任务的时间 diff --git a/requirements.txt b/requirements.txt index a01fee0..09a3cad 100644 --- a/requirements.txt +++ b/requirements.txt @@ -42,7 +42,9 @@ cryptography==3.2 mysqlclient==1.4.4 apscheduler==3.6.3 # django-apscheduler==0.4.2 -requests==2.24.0 -jsonpickle==1.4.1 +# requests==2.24.0 +requests==2.27.1 +# jsonpickle==1.4.1 +jsonpickle==2.2.0 # redisbeat==1.2.4 -django-cachalot==2.3.5 \ No newline at end of file +django-cachalot==2.3.5 diff --git a/test.py b/test.py index a5fff2c..f108fc8 100644 --- a/test.py +++ b/test.py @@ -47,6 +47,11 @@ # 'task': 'tasks.tasks.task_check_scheduler', # 'schedule': timedelta(seconds=7200), # "args": (None, 1, 3), + # 'kwargs': {}, + # 'options': {}, + # "relative": True, + # "limit_run_time": 0, + # 'enable': True, # })) # for task in manager.iter_tasks(): # print(task) @@ -57,6 +62,11 @@ # 'task': 'tasks.tasks.task_check_scheduler', # 'schedule': timedelta(seconds=random.randint(3600,7200)), # "args": (None, 1, 3), + # 'kwargs': {}, + # 'options': {}, + # "relative": True, + # "limit_run_time": 0, + # 'enable': True, # })) # for task in manager.iter_tasks(): # print(task)