diff --git a/devops/settings.py b/devops/settings.py index 948a548..1a83068 100644 --- a/devops/settings.py +++ b/devops/settings.py @@ -278,9 +278,10 @@ 2019-12-07 13:21:39,008 你会发现每次执行时间都会延迟 10-30 毫秒之间(程序执行逻辑耗费的时间),如果任务有严格时间要求,则不适合使用这种类型的任务 cron 任务暂时没发现这个问题 -经过测试发现在 interval 任务种加入 "relative": True 即可解决此问题 +经过测试发现在 interval 任务种加入 "relative": True 后,windows 上面运行的celery可以解决此问题 +但是 linux 下运行的 celery 只有第一次任务的毫秒变成1-10的样子,后面还是会递增(使用 -P eventlet 一样) """ -CELERY_BEAT_FLUSH_TASKS = True # 启动 beat 时是否清空已有任务 +CELERY_BEAT_FLUSH_TASKS = False # 启动 beat 时是否清空已有任务 CELERY_TIMEZONE = TIME_ZONE # celery 使用的是 utc 时间,需要设置为 django 相同时区 CELERY_ENABLE_UTC = False USER_LOGS_KEEP_DAYS = 1095 # 操作日志保留天数,需开启 CELERY_BEAT_SCHEDULE 中相应的定时任务才生效 diff --git a/redismultibeat/scheduler.py b/redismultibeat/scheduler.py index 5fa52cb..b244f22 100644 --- a/redismultibeat/scheduler.py +++ b/redismultibeat/scheduler.py @@ -136,6 +136,11 @@ def __init__(self, *args, **kwargs): self.lock_ttl = app.conf.get("CELERY_BEAT_REDIS_LOCK_TTL", DEFAULT_CELERY_BEAT_REDIS_LOCK_TTL) self.lock_sleep = app.conf.get("CELERY_BEAT_REDIS_LOCK_SLEEP", DEFAULT_CELERY_BEAT_REDIS_LOCK_SLEEP) self.lock = self.rdb.lock(self.lock_key, timeout=self.lock_ttl) + self.tick = self.tick_with_lock + self.close = self.close_with_lock + else: + self.tick = self.tick_with_no_lock + self.close = self.close_with_no_lock def _when(self, entry, next_time_to_run, **kwargs): return mktime(entry.schedule.now().timetuple()) + (self.adjust(next_time_to_run) or 0) @@ -238,23 +243,24 @@ def _tick(self): next_times.append(self.is_due(entry)[1]) return min(next_times) - def tick(self, **kwargs): + def tick_with_lock(self, **kwargs): # 每个任务重新调度会执行 - if self.multi_mode: - if self.lock.acquire(blocking=False): - # debug("Redis Lock acquired") - result = self._tick() - self.lock.release() - # debug("Redis Lock released") - return result - if self.lock_sleep is None: - next_time = random.randint(1, self.lock_ttl) # 如果未获取到锁,随机 sleep - else: # 或者 sleep 设置的值 - next_time = self.lock_sleep - info("Redis Lock not acquired, sleep {} s".format(next_time)) - return next_time - else: - return self._tick() + if self.lock.acquire(blocking=False): + # debug("Redis Lock acquired") + result = self._tick() + self.lock.release() + # debug("Redis Lock released") + return result + if self.lock_sleep is None: + next_time = random.randint(1, self.lock_ttl) # 如果未获取到锁,随机 sleep + else: # 或者 sleep 设置的值 + next_time = self.lock_sleep + info("Redis Lock not acquired, sleep {} s".format(next_time)) + return next_time + + def tick_with_no_lock(self, **kwargs): + # 每个任务重新调度会执行 + return self._tick() def remove_all(self): if self.rdb.exists(self.key): @@ -265,13 +271,16 @@ def remove_all(self): warning("tasks key: {} is not exists in rdb".format(self.key)) return False - def close(self): + def close_with_lock(self): + self.sync() + try: + self.lock.release() + except LockError: + pass + self.rdb.close() + + def close_with_no_lock(self): self.sync() - if self.multi_mode: - try: - self.lock.release() - except LockError: - pass self.rdb.close() def sentinel_connect(self, master_name):