diff --git a/README.md b/README.md index a719209..d712066 100644 --- a/README.md +++ b/README.md @@ -28,6 +28,7 @@ gem 'resque_solo' class UpdateCat include Resque::Plugins::UniqueJob @queue = :cats + @lock_after_execution_period = 20 def self.perform(cat_id) # do something diff --git a/lib/resque_ext/job.rb b/lib/resque_ext/job.rb index cce85b3..c4208c0 100644 --- a/lib/resque_ext/job.rb +++ b/lib/resque_ext/job.rb @@ -1,5 +1,18 @@ module Resque class Job + def perform_solo + res = nil + begin + res = perform_without_solo + ensure + ResqueSolo::Queue.mark_unqueued(@queue, self) + end + res + end + + alias_method :perform_without_solo, :perform + alias_method :perform, :perform_solo + class << self # Mark an item as queued def create_solo(queue, klass, *args) @@ -7,21 +20,7 @@ def create_solo(queue, klass, *args) if Resque.inline? || !ResqueSolo::Queue.is_unique?(item) return create_without_solo(queue, klass, *args) end - return "EXISTED" if ResqueSolo::Queue.queued?(queue, item) - create_return_value = false - # redis transaction block - Resque.redis.multi do - create_return_value = create_without_solo(queue, klass, *args) - ResqueSolo::Queue.mark_queued(queue, item) - end - create_return_value - end - - # Mark an item as unqueued - def reserve_solo(queue) - item = reserve_without_solo(queue) - ResqueSolo::Queue.mark_unqueued(queue, item) if item && !Resque.inline? - item + ResqueSolo::Queue.mark_queued(queue, item) ? create_without_solo(queue, klass, *args) : false end # Mark destroyed jobs as unqueued @@ -32,8 +31,6 @@ def destroy_solo(queue, klass, *args) alias_method :create_without_solo, :create alias_method :create, :create_solo - alias_method :reserve_without_solo, :reserve - alias_method :reserve, :reserve_solo alias_method :destroy_without_solo, :destroy alias_method :destroy, :destroy_solo end diff --git a/lib/resque_solo/queue.rb b/lib/resque_solo/queue.rb index a405330..93234df 100644 --- a/lib/resque_solo/queue.rb +++ b/lib/resque_solo/queue.rb @@ -7,11 +7,12 @@ def queued?(queue, item) end def mark_queued(queue, item) - return unless is_unique?(item) key = unique_key(queue, item) - redis.set(key, 1) + res = redis.setnx(key, 1) + return false unless res ttl = item_ttl(item) redis.expire(key, ttl) if ttl >= 0 + res end def mark_unqueued(queue, job) diff --git a/lib/resque_solo/unique_job.rb b/lib/resque_solo/unique_job.rb index 7d55af4..f6509f3 100644 --- a/lib/resque_solo/unique_job.rb +++ b/lib/resque_solo/unique_job.rb @@ -45,6 +45,11 @@ def ttl def lock_after_execution_period @lock_after_execution_period ||= 0 end + + def before_enqueue_solo_job(*args) + # This returns false if the key was already set + !ResqueSolo::Queue.queued?(@queue, { class: self.to_s, args: args }) + end end end end diff --git a/resque_solo.gemspec b/resque_solo.gemspec index afe730c..a8e25d9 100644 --- a/resque_solo.gemspec +++ b/resque_solo.gemspec @@ -20,5 +20,7 @@ Gem::Specification.new do |spec| spec.add_development_dependency "bundler", "~> 1.6" spec.add_development_dependency "fakeredis", "~> 0.4" spec.add_development_dependency "minitest", "~> 5.8" + spec.add_development_dependency "minitest-reporters", "~> 1.1" spec.add_development_dependency "rake", "~> 12.0" + spec.add_development_dependency "m", "~> 1.5" end diff --git a/test/fake_jobs.rb b/test/fake_jobs.rb index 0e01ae8..e4e1ff0 100644 --- a/test/fake_jobs.rb +++ b/test/fake_jobs.rb @@ -36,3 +36,51 @@ class UniqueJobWithLock def self.perform(*_) end end + +class EnqueueFailUniqueJob + include Resque::Plugins::UniqueJob + @queue = :unique + + def self.perform(_) + end + + def self.before_enqueue_fail + false + end +end + +class EnqueueErrorUniqueJob + include Resque::Plugins::UniqueJob + @queue = :unique + + def self.perform(_) + end + + def self.before_enqueue_zzz_error + raise "Fail" + end +end + +class DontPerformUniqueJob + include Resque::Plugins::UniqueJob + @queue = :unique + + def self.perform(_) + end + + def self.before_perform_dont + raise Resque::Job::DontPerform + end +end + +class BeforePerformErrorUniqueJob + include Resque::Plugins::UniqueJob + @queue = :unique + + def self.perform(_) + end + + def self.before_perform_dont + raise "Fail" + end +end \ No newline at end of file diff --git a/test/job_test.rb b/test/job_test.rb index ae1c5d5..56c0097 100644 --- a/test/job_test.rb +++ b/test/job_test.rb @@ -15,7 +15,7 @@ class JobTest < MiniTest::Spec Resque.enqueue FakeUniqueJob, "foo" Resque.enqueue FakeUniqueJob, "foo" assert_equal 1, Resque.size(:unique) - Resque.reserve(:unique) + perform_one_manually(:unique) assert_equal 0, Resque.size(:unique) Resque.enqueue FakeUniqueJob, "foo" Resque.enqueue FakeUniqueJob, "foo" @@ -48,13 +48,30 @@ class JobTest < MiniTest::Spec it "mark jobs as unqueued when they raise an exception" do 2.times { Resque.enqueue(FailingUniqueJob, "foo") } assert_equal 1, Resque.size(:unique) - worker = Resque::Worker.new(:unique) - worker.work 0 + assert_raises { perform_one_manually(:unique) } assert_equal 0, Resque.size(:unique) 2.times { Resque.enqueue(FailingUniqueJob, "foo") } assert_equal 1, Resque.size(:unique) end + it "mark jobs as unqueued when a before_perform filter raises an exception" do + 2.times { Resque.enqueue(BeforePerformErrorUniqueJob, "foo") } + assert_equal 1, Resque.size(:unique) + assert_raises { perform_one_manually(:unique) } + assert_equal 0, Resque.size(:unique) + 2.times { Resque.enqueue(BeforePerformErrorUniqueJob, "foo") } + assert_equal 1, Resque.size(:unique) + end + + it "mark jobs as unqueued when a before_perform filter raises a DontPerform exception" do + 2.times { Resque.enqueue(DontPerformUniqueJob, "foo") } + assert_equal 1, Resque.size(:unique) + assert_raises { perform_one_manually(:unique) } + assert_equal 0, Resque.size(:unique) + 2.times { Resque.enqueue(DontPerformUniqueJob, "foo") } + assert_equal 1, Resque.size(:unique) + end + it "report if a unique job is enqueued" do Resque.enqueue FakeUniqueJob, "foo" assert Resque.enqueued?(FakeUniqueJob, "foo") @@ -98,7 +115,7 @@ class JobTest < MiniTest::Spec it "honor lock_after_execution_period in the redis key" do Resque.enqueue UniqueJobWithLock - Resque.reserve(:unique_with_lock) + perform_one_manually(:unique_with_lock) keys = Resque.redis.keys "solo:queue:unique_with_lock:job:*" assert_equal 1, keys.length assert_in_delta UniqueJobWithLock.lock_after_execution_period, diff --git a/test/resque_test.rb b/test/resque_test.rb index 7079086..1bf408c 100644 --- a/test/resque_test.rb +++ b/test/resque_test.rb @@ -26,19 +26,30 @@ class ResqueTest < MiniTest::Spec describe "#enqueue_to" do describe "non-unique job" do it "should return true if job was enqueued" do - assert Resque.enqueue_to(:normal, FakeJob) - assert Resque.enqueue_to(:normal, FakeJob) + assert Resque.enqueue_to(:unique, FakeJob) + assert Resque.enqueue_to(:unique, FakeJob) end end describe "unique job" do it "should return true if job was enqueued" do - assert Resque.enqueue_to(:normal, FakeUniqueJob) + assert Resque.enqueue_to(:unique, FakeUniqueJob) end it "should return nil if job already existed" do - Resque.enqueue_to(:normal, FakeUniqueJob) - assert_nil Resque.enqueue_to(:normal, FakeUniqueJob) + Resque.enqueue_to(:unique, FakeUniqueJob) + assert Resque.enqueued?(FakeUniqueJob) + assert_nil Resque.enqueue_to(:unique, FakeUniqueJob) + end + + it "should not mark enqueued if another before_enqueue hook fails" do + assert_nil Resque.enqueue_to(:unique, EnqueueFailUniqueJob), "Should not have been actually enqueued" + refute Resque.enqueued?(EnqueueFailUniqueJob), "Should not have been marked enqueued" + end + + it "should not mark enqueued if another before_enqueue hook errors" do + assert_raises { Resque.enqueue_to(:unique, EnqueueErrorUniqueJob) } + refute Resque.enqueued?(EnqueueErrorUniqueJob), "Should not have been marked enqueued" end end end diff --git a/test/test_helper.rb b/test/test_helper.rb index 67a7e1a..6d77005 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -4,11 +4,18 @@ end require "minitest/autorun" +require "minitest/reporters" require "resque_solo" require "fake_jobs" -require "fakeredis" +require "fakeredis/minitest" begin require "pry-byebug" rescue LoadError # ignore end + +Minitest::Reporters.use! [Minitest::Reporters::DefaultReporter.new({ color: true })] + +def perform_one_manually(queue_name) + Resque::Job.reserve(queue_name).perform +end \ No newline at end of file