Skip to content

Commit

Permalink
RedisMultiScheduler新增任务enable参数
Browse files Browse the repository at this point in the history
  • Loading branch information
leffss committed Aug 2, 2022
1 parent 4cc65bc commit 6a7d0da
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 14 deletions.
38 changes: 34 additions & 4 deletions devops/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,9 +277,9 @@
...
2019-12-07 13:21:39,008
你会发现每次执行时间都会延迟 10-30 毫秒之间(程序执行逻辑耗费的时间),如果任务有严格时间要求,则不适合使用这种类型的任务
cron 任务暂时没发现这个问题
经过测试发现在 interval 任务种加入 "relative": True 后,windows 上面运行的celery可以解决此问题
但是 linux 下运行的 celery 只有第一次任务的毫秒变成1-10的样子,后面还是会递增(使用 -P eventlet 一样)
cron 任务也发现这个问题
经过测试发在任务中加入 "relative": True 后,在我的其他django项目中解决了这个问题,本项目还是有问题
两个项目scheduler代码一样,只是依赖也不太相同,不知道是什么原因了
"""
CELERY_BEAT_FLUSH_TASKS = False # 启动 beat 时是否清空已有任务
CELERY_TIMEZONE = TIME_ZONE # celery 使用的是 utc 时间,需要设置为 django 相同时区
Expand All @@ -293,33 +293,63 @@
# 'schedule': timedelta(seconds=30), # 任务循环时间
# # "args": None, # 参数
# "args": (None, 0, 3), # 参数,可迭代对象,元组或者列表
# 'kwargs': {}, # task 任务函数参数
# 'options': {}, # apply_async 函数附加参数
# 'relative': True,
# 'limit_run_time': 0,
# 'enable': True,
# },
# 'task_check_scheduler_cron': {
# 'task': 'tasks.tasks.task_check_scheduler',
# 'schedule': crontab(minute='*/1', hour='*', day_of_week='*', day_of_month='*', month_of_year='*'), # cron 任务
# # "args": None, # 参数
# "args": (None, 0, 3), # 参数,可迭代对象,元组或者列表
# 'kwargs': {},
# 'options': {},
# 'relative': True,
# 'limit_run_time': 0,
# 'enable': True,
# },
'task_cls_terminalsession': { # 清除 terminalsession 表,系统异常退出时此表可能会有垃圾数据,仅启动时运行一次
'task': 'tasks.tasks.task_cls_terminalsession',
'schedule': timedelta(seconds=3),
'args': None,
'kwargs': {},
'options': {},
"relative": True,
"limit_run_time": 1, # 限制任务执行次数,>=0, 默认 0 为不限制。注意:celery 原版 beat 是不支持此参数的
'enable': True, # 是否启用。注意:celery 原版 beat 是不支持此参数的
},
# 'task_cls_user_logs': { # 清除操作日志定时任务,如不自动清除,注释此任务即可
# 'task': 'tasks.tasks.task_cls_user_logs',
# 'schedule': crontab(minute=5, hour=2),
# "args": (USER_LOGS_KEEP_DAYS,),
# 'kwargs': {},
# 'options': {},
# 'relative': True,
# 'limit_run_time': 0,
# 'enable': True,
# },
'task_cls_terminal_logs': { # 清除终端日志定时任务,如不自动清除,注释此任务即可
'task': 'tasks.tasks.task_cls_terminal_logs',
'schedule': crontab(minute=10, hour=2),
"args": (TERMINAL_LOGS_KEEP_DAYS,),
'kwargs': {},
'options': {},
"relative": True,
"limit_run_time": 0,
'enable': True,
},
'task_cls_batch_logs': { # 清除批量日志定时任务,如不自动清除,注释此任务即可
'task': 'tasks.tasks.task_cls_batch_logs',
'schedule': crontab(minute=15, hour=2),
"args": [BATCH_LOGS_KEEP_DAYS],
# "args": [BATCH_LOGS_KEEP_DAYS],
'args': None,
'kwargs': {'keep_days': BATCH_LOGS_KEEP_DAYS},
'options': {},
"relative": True,
"limit_run_time": 0,
'enable': True,
},
}

Expand Down
39 changes: 32 additions & 7 deletions redismultibeat/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,22 @@
"relative": True,
# "args": None, # 参数
"args": (None, 0, 3), # 参数
"limit_run_time": 5, # 限制运行次数
'kwargs': {},
'options': {},
"relative": True,
"limit_run_time": 5, # 限制任务执行次数,>=0, 默认 0 为不限制。注意:celery 原版 beat 是不支持此参数的
'enable': True, # 是否启用。注意:celery 原版 beat 是不支持此参数的
},
'task_check_scheduler_cron': {
'task': 'tasks.tasks.task_check_scheduler',
'schedule': crontab(minute='*/1', hour='*', day_of_week='*', day_of_month='*', month_of_year='*'), # cron 任务
# "args": None, # 参数
"args": (None, 0, 3), # 参数
'kwargs': {},
'options': {},
"relative": True,
"limit_run_time": 0,
'enable': True,
}
}
Expand All @@ -36,6 +45,11 @@
'task': 'tasks.tasks.task_check_scheduler',
'schedule': timedelta(seconds=7200),
"args": (None, 1, 3),
'kwargs': {},
'options': {},
"relative": True,
"limit_run_time": 0,
'enable': True,
})
# 动态删除任务:
Expand All @@ -47,7 +61,11 @@
'task': 'tasks.tasks.task_check_scheduler',
'schedule': timedelta(seconds=1600),
"args": (None, 1, 3),
"limit_run_time": 5, # 限制运行次数
'kwargs': {},
'options': {},
"relative": True,
"limit_run_time": 0,
'enable': False,
})
manager.close()
Expand Down Expand Up @@ -94,6 +112,7 @@ class CustomScheduleEntry(ScheduleEntry):
total_run_count (int): 参考 celery 官方文档
relative (bool): 参考 celery 官方文档
limit_run_time (int): 限制任务执行次数,>=0, 0 为不限制
enable (bool): 是否启用任务
"""

limit_run_time = 0
Expand Down Expand Up @@ -218,20 +237,26 @@ def _tick(self):
next_times.append(next_time_to_run)
if is_due:
try:
info("scheduler task entry: {} to publisher, total_run_count: {}, limit_run_time: {}".format(entry.name, entry.total_run_count + 1, entry.limit_run_time))
result = self.apply_async(entry) # 添加任务到worker队列
if entry.enable:
info("scheduler task entry: {} to publisher, total_run_count: {}, limit_run_time: {}".format(entry.name, entry.total_run_count + 1, entry.limit_run_time))
result = self.apply_async(entry) # 添加任务到worker队列
debug('%s sent. id->%s', entry.task, result.id)
else:
info(
"task entry disable: {}, total_run_count: {}, limit_run_time: {}".format(
entry.name, entry.total_run_count, entry.limit_run_time))
except Exception as exc:
error('Message Error: %s\n%s',
exc, traceback.format_stack(), exc_info=True)
else:
debug('%s sent. id->%s', entry.task, result.id)
next_entry = self.reserve(entry)
if not entry.enable:
next_entry.total_run_count = next_entry.total_run_count - 1
pipe.zrem(self.key, task) # 删除旧的任务
if next_entry.limit_run_time == 0 or next_entry.total_run_count < next_entry.limit_run_time:
# 将旧任务重新计算时间后再添加
pipe.zadd(self.key, {jsonpickle.encode(next_entry): self._when(next_entry, next_time_to_run, ) or 0})
else:
logger.info("task entry: {} limit to run {} times, stopped".format(entry.name, entry.limit_run_time))
logger.info("task entry: {} limit to run {} times, total run {} times, stopped".format(entry.name, entry.limit_run_time, entry.total_run_count + 1))
pipe.execute()

# 获取最近一个需要执行的任务的时间
Expand Down
8 changes: 5 additions & 3 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ cryptography==3.2
mysqlclient==1.4.4
apscheduler==3.6.3
# django-apscheduler==0.4.2
requests==2.24.0
jsonpickle==1.4.1
# requests==2.24.0
requests==2.27.1
# jsonpickle==1.4.1
jsonpickle==2.2.0
# redisbeat==1.2.4
django-cachalot==2.3.5
django-cachalot==2.3.5
10 changes: 10 additions & 0 deletions test.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@
# 'task': 'tasks.tasks.task_check_scheduler',
# 'schedule': timedelta(seconds=7200),
# "args": (None, 1, 3),
# 'kwargs': {},
# 'options': {},
# "relative": True,
# "limit_run_time": 0,
# 'enable': True,
# }))
# for task in manager.iter_tasks():
# print(task)
Expand All @@ -57,6 +62,11 @@
# 'task': 'tasks.tasks.task_check_scheduler',
# 'schedule': timedelta(seconds=random.randint(3600,7200)),
# "args": (None, 1, 3),
# 'kwargs': {},
# 'options': {},
# "relative": True,
# "limit_run_time": 0,
# 'enable': True,
# }))
# for task in manager.iter_tasks():
# print(task)
Expand Down

0 comments on commit 6a7d0da

Please sign in to comment.