Skip to content

Commit

Permalink
修改 redis beat 任务 relative 不生效问题
Browse files Browse the repository at this point in the history
  • Loading branch information
leffss committed Aug 9, 2022
1 parent 27c9616 commit dd590c8
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 20 deletions.
5 changes: 1 addition & 4 deletions devops/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@
cron 任务也发现这个问题
经过测试发在任务中加入 "relative": True 后,在我的其他django项目中解决了这个问题,本项目还是有问题
两个项目scheduler代码一样,只是依赖也不太相同,不知道是什么原因了
注意:已经修复
"""
CELERY_BEAT_FLUSH_TASKS = False # 启动 beat 时是否清空已有任务
CELERY_TIMEZONE = TIME_ZONE # celery 使用的是 utc 时间,需要设置为 django 相同时区
Expand All @@ -306,7 +307,6 @@
# "args": (None, 0, 3), # 参数,可迭代对象,元组或者列表
# 'kwargs': {},
# 'options': {},
# 'relative': True,
# 'limit_run_time': 0,
# 'enable': True,
# },
Expand All @@ -326,7 +326,6 @@
# "args": (USER_LOGS_KEEP_DAYS,),
# 'kwargs': {},
# 'options': {},
# 'relative': True,
# 'limit_run_time': 0,
# 'enable': True,
# },
Expand All @@ -336,7 +335,6 @@
"args": (TERMINAL_LOGS_KEEP_DAYS,),
'kwargs': {},
'options': {},
"relative": True,
"limit_run_time": 0,
'enable': True,
},
Expand All @@ -347,7 +345,6 @@
'args': None,
'kwargs': {'keep_days': BATCH_LOGS_KEEP_DAYS},
'options': {},
"relative": True,
"limit_run_time": 0,
'enable': True,
},
Expand Down
62 changes: 46 additions & 16 deletions redismultibeat/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
"args": (None, 0, 3), # 参数
'kwargs': {},
'options': {},
"relative": True,
"limit_run_time": 0,
'enable': True,
}
Expand Down Expand Up @@ -74,7 +73,6 @@
import traceback
import sys
from time import mktime
from functools import partial
import jsonpickle
from celery.beat import Scheduler, ScheduleEntry
from redis import Redis
Expand All @@ -84,14 +82,15 @@
from redis.exceptions import LockError
import urllib.parse as urlparse
from kombu.utils.functional import reprcall
from celery.utils.time import maybe_make_aware

logger = get_logger(__name__)
debug, info, error, warning = (logger.debug, logger.info, logger.error, logger.warning)
MAXINT = sys.maxsize # 最大整数,用于提取 redis 中所有任务
DEFAULT_CELERY_BEAT_REDIS_SCHEDULER_URL = 'redis://127.0.0.1:6379/0'
DEFAULT_CELERY_BEAT_REDIS_SCHEDULER_KEY = 'celery:beat:tasks'
DEFAULT_CELERY_BROKER_TRANSPORT_OPTIONS = {"master_name": "master"}
DEFAULT_CELERY_BEAT_MAX_LOOP_INTERVAL = 300
DEFAULT_CELERY_BEAT_MAX_LOOP_INTERVAL = 60
DEFAULT_CELERY_BEAT_REDIS_MULTI_NODE_MODE = False
DEFAULT_CELERY_BEAT_REDIS_LOCK_KEY = 'celery:beat:lock'
DEFAULT_CELERY_BEAT_REDIS_LOCK_TTL = 60
Expand All @@ -100,7 +99,7 @@
DEFAULT_CELERY_BEAT_SCHEDULE = {}


class CustomScheduleEntry(ScheduleEntry):
class RedisScheduleEntry(ScheduleEntry):
"""
重写官方 ScheduleEntry 以支持 limit_run_time 设置
Arguments:
Expand All @@ -118,16 +117,19 @@ class CustomScheduleEntry(ScheduleEntry):

limit_run_time = 0
enable = True
relative = True

def __init__(self, limit_run_time=None, enable=True, *args, **kwargs):
def __init__(self, limit_run_time=None, enable=True, relative=True, *args, **kwargs):
self.limit_run_time = limit_run_time or 0
self.enable = enable
ScheduleEntry.__init__(self, *args, **kwargs)
self.relative = relative
ScheduleEntry.__init__(self, relative=relative, *args, **kwargs)

def __reduce__(self):
return self.__class__, (
self.limit_run_time,
self.enable,
self.relative,
self.name, self.task, self.last_run_at, self.total_run_count,
self.schedule, self.args, self.kwargs, self.options,
)
Expand All @@ -143,26 +145,39 @@ def update(self, other):
'task': other.task, 'schedule': other.schedule,
'args': other.args, 'kwargs': other.kwargs,
'options': other.options, 'limit_run_time': other.limit_run_time,
'enable': other.enable
'enable': other.enable, 'relative': other.relative
})

def __repr__(self):
return '<{name}: {0.name} {call} {0.schedule} limit_run_time: {0.limit_run_time} ' \
'enable: {0.enable} options: {0.options}>'.format(
'enable: {0.enable} options: {0.options} relative: {0.relative}>'.format(
self,
call=reprcall(self.task, self.args or (), self.kwargs or {}),
name=type(self).__name__,
)

def editable_fields_equal(self, other):
for attr in ('task', 'args', 'kwargs', 'options', 'schedule', 'limit_run_time', 'enable'):
for attr in ('task', 'args', 'kwargs', 'options', 'schedule', 'limit_run_time', 'enable', 'relative'):
if getattr(self, attr) != getattr(other, attr):
return False
return True

def format_json(self):
return {
'name': self.name,
'task': self.task,
# 'schedule': timedelta(seconds=1600),
"args": self.args,
'kwargs': self.kwargs,
'options': self.options,
"limit_run_time": self.limit_run_time,
'relative': self.relative,
'enable': self.enable,
}


class RedisMultiScheduler(Scheduler):
Entry = CustomScheduleEntry
Entry = RedisScheduleEntry

def __init__(self, *args, **kwargs):
app = kwargs['app']
Expand Down Expand Up @@ -191,7 +206,13 @@ def __init__(self, *args, **kwargs):
self.close = self.close_with_no_lock

def _when(self, entry, next_time_to_run, **kwargs):
return mktime(entry.schedule.now().timetuple()) + (self.adjust(next_time_to_run) or 0)
# return mktime(entry.schedule.now().timetuple()) + (self.adjust(next_time_to_run) or 0)
# """Return a utc timestamp, make sure heapq in currect order."""
adjust = self.adjust
as_now = maybe_make_aware(entry.default_now())
return (mktime(as_now.utctimetuple()) +
as_now.microsecond / 1e6 +
(adjust(next_time_to_run) or 0))

def setup_schedule(self):
# if self.flush_tasks:
Expand Down Expand Up @@ -256,19 +277,20 @@ def is_due(self, entry):
def _tick(self):
tasks = self.rdb.zrangebyscore(
self.key, 0,
self.adjust(mktime(self.app.now().timetuple()), drift=0.010),
self.adjust(mktime(self.app.now().timetuple()), drift=-0.010),
withscores=True) or []
next_times = [self.max_interval, ]
with self.rdb.pipeline() as pipe:
for task, score in tasks:
entry = jsonpickle.decode(task)
is_due, next_time_to_run = self.is_due(entry)
next_times.append(next_time_to_run)
# next_times.append(next_time_to_run)
next_times.append(self.adjust(next_time_to_run))
if is_due:
try:
if entry.enable:
info("scheduler task entry: {} to publisher, total_run_count: {}, limit_run_time: {}".format(entry.name, entry.total_run_count + 1, entry.limit_run_time))
result = self.apply_async(entry) # 添加任务到worker队列
result = self.apply_async(entry, advance=False) # 添加任务到worker队列
debug('%s sent. id->%s', entry.task, result.id)
else:
info(
Expand Down Expand Up @@ -375,7 +397,7 @@ def info(self):


class RedisBeatManager(Scheduler):
Entry = CustomScheduleEntry
Entry = RedisScheduleEntry

def __init__(self, app):
self.schedule_url = app.conf.get("CELERY_BEAT_REDIS_SCHEDULER_URL", DEFAULT_CELERY_BEAT_REDIS_SCHEDULER_URL)
Expand All @@ -388,7 +410,15 @@ def __init__(self, app):
self.rdb = Redis.from_url(self.schedule_url)

def _when(self, entry, next_time_to_run, **kwargs):
return mktime(entry.schedule.now().timetuple()) + (self.adjust(next_time_to_run) or 0)
# return mktime(entry.schedule.now().timetuple()) + (self.adjust(next_time_to_run) or 0)
# """Return a utc timestamp, make sure heapq in currect order."""
adjust = self.adjust

as_now = maybe_make_aware(entry.default_now())

return (mktime(as_now.utctimetuple()) +
as_now.microsecond / 1e6 +
(adjust(next_time_to_run) or 0))

def add(self, **kwargs):
e = self.Entry(app=current_app, **kwargs)
Expand Down

0 comments on commit dd590c8

Please sign in to comment.