Skip to content

Commit

Permalink
Merge pull request #35 from bibendi/durable-redis
Browse files Browse the repository at this point in the history
feat: don't fail when redis is down
  • Loading branch information
bibendi authored Dec 16, 2022
2 parents d47addd + e7d0386 commit eb1458a
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 10 deletions.
15 changes: 13 additions & 2 deletions lib/schked/redis_locker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,30 @@ module Schked
class RedisLocker
attr_reader :lock_manager,
:lock_id,
:lock_ttl
:lock_ttl,
:logger

LOCK_KEY = "schked:redis_locker"
LOCK_TTL = 60_000 # ms

def initialize(redis_servers, lock_ttl: LOCK_TTL)
def initialize(redis_servers, lock_ttl: LOCK_TTL, logger: Logger.new($stdout))
@lock_manager = Redlock::Client.new(redis_servers, retry_count: 0)
@lock_ttl = lock_ttl
@logger = logger
end

def lock
valid_lock? || !!try_lock
rescue => e
logger.error("Failed to acquire a lock with error: #{e.message}")
false
end

def unlock
lock_manager.unlock(lock_id) if valid_lock?
rescue => e
logger.error("Failed to release the lock with error: #{e.message}")
false
end

def extend_lock
Expand All @@ -28,6 +36,9 @@ def extend_lock
@lock_id = lock_manager.lock(LOCK_KEY, lock_ttl, extend: lock_id, extend_only_if_locked: true)

!!@lock_id
rescue => e
logger.error("Failed to extend the lock with error: #{e.message}")
false
end

def valid_lock?
Expand Down
2 changes: 1 addition & 1 deletion lib/schked/version.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# frozen_string_literal: true

module Schked
VERSION = "1.1.1"
VERSION = "1.1.2"
end
17 changes: 10 additions & 7 deletions lib/schked/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ class Worker
def initialize(config:)
@config = config

@locker = RedisLocker.new(config.redis_servers, lock_ttl: 40_000) unless config.standalone?
@locker = RedisLocker.new(config.redis_servers, lock_ttl: 40_000, logger: config.logger) unless config.standalone?
@scheduler = Rufus::Scheduler.new(trigger_lock: locker)

watch_signals
Expand Down Expand Up @@ -46,26 +46,29 @@ def schedule
def define_callbacks
cfg = config

scheduler.define_singleton_method(:on_error) do |job, error|
name = if job
scheduler.define_singleton_method(:extract_job_name) do |job|
if job
job.opts[:as] || job.job_id
else
"unknown"
end
cfg.logger.fatal("Task #{name} failed with error: #{error.message}")
end

scheduler.define_singleton_method(:on_error) do |job, error|
cfg.logger.fatal("Task #{extract_job_name(job)} failed with error: #{error.message}")
cfg.logger.error(error.backtrace.join("\n")) if error.backtrace

cfg.fire_callback(:on_error, job, error)
end

scheduler.define_singleton_method(:on_pre_trigger) do |job, time|
cfg.logger.info("Started task: #{job.opts[:as] || job.job_id}")
cfg.logger.info("Started task: #{extract_job_name(job)}")

cfg.fire_callback(:before_start, job, time)
end

scheduler.define_singleton_method(:on_post_trigger) do |job, time|
cfg.logger.info("Finished task: #{job.opts[:as] || job.job_id}")
cfg.logger.info("Finished task: #{extract_job_name(job)}")

cfg.fire_callback(:after_finish, job, time)
end
Expand All @@ -85,7 +88,7 @@ def watch_signals
Thread.new do
loop do
scheduler.shutdown(wait: 5) if @shutdown
sleep 0.2
sleep 1
end
end
end
Expand Down

0 comments on commit eb1458a

Please sign in to comment.