Skip to content

Commit

Permalink
优化RedisMultiScheduler多实例模式
Browse files Browse the repository at this point in the history
  • Loading branch information
leffss committed Jul 31, 2022
1 parent 0d36d35 commit ac18496
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 24 deletions.
5 changes: 3 additions & 2 deletions devops/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,9 +278,10 @@
2019-12-07 13:21:39,008
你会发现每次执行时间都会延迟 10-30 毫秒之间(程序执行逻辑耗费的时间),如果任务有严格时间要求,则不适合使用这种类型的任务
cron 任务暂时没发现这个问题
经过测试发现在 interval 任务种加入 "relative": True 即可解决此问题
经过测试发现在 interval 任务种加入 "relative": True 后,windows 上面运行的celery可以解决此问题
但是 linux 下运行的 celery 只有第一次任务的毫秒变成1-10的样子,后面还是会递增(使用 -P eventlet 一样)
"""
CELERY_BEAT_FLUSH_TASKS = True # 启动 beat 时是否清空已有任务
CELERY_BEAT_FLUSH_TASKS = False # 启动 beat 时是否清空已有任务
CELERY_TIMEZONE = TIME_ZONE # celery 使用的是 utc 时间,需要设置为 django 相同时区
CELERY_ENABLE_UTC = False
USER_LOGS_KEEP_DAYS = 1095 # 操作日志保留天数,需开启 CELERY_BEAT_SCHEDULE 中相应的定时任务才生效
Expand Down
53 changes: 31 additions & 22 deletions redismultibeat/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,11 @@ def __init__(self, *args, **kwargs):
self.lock_ttl = app.conf.get("CELERY_BEAT_REDIS_LOCK_TTL", DEFAULT_CELERY_BEAT_REDIS_LOCK_TTL)
self.lock_sleep = app.conf.get("CELERY_BEAT_REDIS_LOCK_SLEEP", DEFAULT_CELERY_BEAT_REDIS_LOCK_SLEEP)
self.lock = self.rdb.lock(self.lock_key, timeout=self.lock_ttl)
self.tick = self.tick_with_lock
self.close = self.close_with_lock
else:
self.tick = self.tick_with_no_lock
self.close = self.close_with_no_lock

def _when(self, entry, next_time_to_run, **kwargs):
return mktime(entry.schedule.now().timetuple()) + (self.adjust(next_time_to_run) or 0)
Expand Down Expand Up @@ -238,23 +243,24 @@ def _tick(self):
next_times.append(self.is_due(entry)[1])
return min(next_times)

def tick(self, **kwargs):
def tick_with_lock(self, **kwargs):
# 每个任务重新调度会执行
if self.multi_mode:
if self.lock.acquire(blocking=False):
# debug("Redis Lock acquired")
result = self._tick()
self.lock.release()
# debug("Redis Lock released")
return result
if self.lock_sleep is None:
next_time = random.randint(1, self.lock_ttl) # 如果未获取到锁,随机 sleep
else: # 或者 sleep 设置的值
next_time = self.lock_sleep
info("Redis Lock not acquired, sleep {} s".format(next_time))
return next_time
else:
return self._tick()
if self.lock.acquire(blocking=False):
# debug("Redis Lock acquired")
result = self._tick()
self.lock.release()
# debug("Redis Lock released")
return result
if self.lock_sleep is None:
next_time = random.randint(1, self.lock_ttl) # 如果未获取到锁,随机 sleep
else: # 或者 sleep 设置的值
next_time = self.lock_sleep
info("Redis Lock not acquired, sleep {} s".format(next_time))
return next_time

def tick_with_no_lock(self, **kwargs):
# 每个任务重新调度会执行
return self._tick()

def remove_all(self):
if self.rdb.exists(self.key):
Expand All @@ -265,13 +271,16 @@ def remove_all(self):
warning("tasks key: {} is not exists in rdb".format(self.key))
return False

def close(self):
def close_with_lock(self):
self.sync()
try:
self.lock.release()
except LockError:
pass
self.rdb.close()

def close_with_no_lock(self):
self.sync()
if self.multi_mode:
try:
self.lock.release()
except LockError:
pass
self.rdb.close()

def sentinel_connect(self, master_name):
Expand Down

0 comments on commit ac18496

Please sign in to comment.