Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use after hook #10

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ gem 'resque_solo'
class UpdateCat
include Resque::Plugins::UniqueJob
@queue = :cats
@lock_after_execution_period = 20

def self.perform(cat_id)
# do something
Expand Down
31 changes: 14 additions & 17 deletions lib/resque_ext/job.rb
Original file line number Diff line number Diff line change
@@ -1,27 +1,26 @@
module Resque
class Job
def perform_solo
res = nil
begin
res = perform_without_solo
ensure
ResqueSolo::Queue.mark_unqueued(@queue, self)
end
res
end

alias_method :perform_without_solo, :perform
alias_method :perform, :perform_solo

class << self
# Mark an item as queued
def create_solo(queue, klass, *args)
item = { class: klass.to_s, args: args }
if Resque.inline? || !ResqueSolo::Queue.is_unique?(item)
return create_without_solo(queue, klass, *args)
end
return "EXISTED" if ResqueSolo::Queue.queued?(queue, item)
create_return_value = false
# redis transaction block
Resque.redis.multi do
create_return_value = create_without_solo(queue, klass, *args)
ResqueSolo::Queue.mark_queued(queue, item)
end
create_return_value
end

# Mark an item as unqueued
def reserve_solo(queue)
item = reserve_without_solo(queue)
ResqueSolo::Queue.mark_unqueued(queue, item) if item && !Resque.inline?
item
ResqueSolo::Queue.mark_queued(queue, item) ? create_without_solo(queue, klass, *args) : false
end

# Mark destroyed jobs as unqueued
Expand All @@ -32,8 +31,6 @@ def destroy_solo(queue, klass, *args)

alias_method :create_without_solo, :create
alias_method :create, :create_solo
alias_method :reserve_without_solo, :reserve
alias_method :reserve, :reserve_solo
alias_method :destroy_without_solo, :destroy
alias_method :destroy, :destroy_solo
end
Expand Down
5 changes: 3 additions & 2 deletions lib/resque_solo/queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ def queued?(queue, item)
end

def mark_queued(queue, item)
return unless is_unique?(item)
key = unique_key(queue, item)
redis.set(key, 1)
res = redis.setnx(key, 1)
return false unless res
ttl = item_ttl(item)
redis.expire(key, ttl) if ttl >= 0
res
end

def mark_unqueued(queue, job)
Expand Down
5 changes: 5 additions & 0 deletions lib/resque_solo/unique_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ def ttl
def lock_after_execution_period
@lock_after_execution_period ||= 0
end

def before_enqueue_solo_job(*args)
# This returns false if the key was already set
!ResqueSolo::Queue.queued?(@queue, { class: self.to_s, args: args })
end
end
end
end
Expand Down
2 changes: 2 additions & 0 deletions resque_solo.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,7 @@ Gem::Specification.new do |spec|
spec.add_development_dependency "bundler", "~> 1.6"
spec.add_development_dependency "fakeredis", "~> 0.4"
spec.add_development_dependency "minitest", "~> 5.8"
spec.add_development_dependency "minitest-reporters", "~> 1.1"
spec.add_development_dependency "rake", "~> 12.0"
spec.add_development_dependency "m", "~> 1.5"
end
48 changes: 48 additions & 0 deletions test/fake_jobs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,51 @@ class UniqueJobWithLock
def self.perform(*_)
end
end

class EnqueueFailUniqueJob
include Resque::Plugins::UniqueJob
@queue = :unique

def self.perform(_)
end

def self.before_enqueue_fail
false
end
end

class EnqueueErrorUniqueJob
include Resque::Plugins::UniqueJob
@queue = :unique

def self.perform(_)
end

def self.before_enqueue_zzz_error
raise "Fail"
end
end

class DontPerformUniqueJob
include Resque::Plugins::UniqueJob
@queue = :unique

def self.perform(_)
end

def self.before_perform_dont
raise Resque::Job::DontPerform
end
end

class BeforePerformErrorUniqueJob
include Resque::Plugins::UniqueJob
@queue = :unique

def self.perform(_)
end

def self.before_perform_dont
raise "Fail"
end
end
25 changes: 21 additions & 4 deletions test/job_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class JobTest < MiniTest::Spec
Resque.enqueue FakeUniqueJob, "foo"
Resque.enqueue FakeUniqueJob, "foo"
assert_equal 1, Resque.size(:unique)
Resque.reserve(:unique)
perform_one_manually(:unique)
assert_equal 0, Resque.size(:unique)
Resque.enqueue FakeUniqueJob, "foo"
Resque.enqueue FakeUniqueJob, "foo"
Expand Down Expand Up @@ -48,13 +48,30 @@ class JobTest < MiniTest::Spec
it "mark jobs as unqueued when they raise an exception" do
2.times { Resque.enqueue(FailingUniqueJob, "foo") }
assert_equal 1, Resque.size(:unique)
worker = Resque::Worker.new(:unique)
worker.work 0
assert_raises { perform_one_manually(:unique) }
assert_equal 0, Resque.size(:unique)
2.times { Resque.enqueue(FailingUniqueJob, "foo") }
assert_equal 1, Resque.size(:unique)
end

it "mark jobs as unqueued when a before_perform filter raises an exception" do
2.times { Resque.enqueue(BeforePerformErrorUniqueJob, "foo") }
assert_equal 1, Resque.size(:unique)
assert_raises { perform_one_manually(:unique) }
assert_equal 0, Resque.size(:unique)
2.times { Resque.enqueue(BeforePerformErrorUniqueJob, "foo") }
assert_equal 1, Resque.size(:unique)
end

it "mark jobs as unqueued when a before_perform filter raises a DontPerform exception" do
2.times { Resque.enqueue(DontPerformUniqueJob, "foo") }
assert_equal 1, Resque.size(:unique)
assert_raises { perform_one_manually(:unique) }
assert_equal 0, Resque.size(:unique)
2.times { Resque.enqueue(DontPerformUniqueJob, "foo") }
assert_equal 1, Resque.size(:unique)
end

it "report if a unique job is enqueued" do
Resque.enqueue FakeUniqueJob, "foo"
assert Resque.enqueued?(FakeUniqueJob, "foo")
Expand Down Expand Up @@ -98,7 +115,7 @@ class JobTest < MiniTest::Spec

it "honor lock_after_execution_period in the redis key" do
Resque.enqueue UniqueJobWithLock
Resque.reserve(:unique_with_lock)
perform_one_manually(:unique_with_lock)
keys = Resque.redis.keys "solo:queue:unique_with_lock:job:*"
assert_equal 1, keys.length
assert_in_delta UniqueJobWithLock.lock_after_execution_period,
Expand Down
21 changes: 16 additions & 5 deletions test/resque_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,30 @@ class ResqueTest < MiniTest::Spec
describe "#enqueue_to" do
describe "non-unique job" do
it "should return true if job was enqueued" do
assert Resque.enqueue_to(:normal, FakeJob)
assert Resque.enqueue_to(:normal, FakeJob)
assert Resque.enqueue_to(:unique, FakeJob)
assert Resque.enqueue_to(:unique, FakeJob)
end
end

describe "unique job" do
it "should return true if job was enqueued" do
assert Resque.enqueue_to(:normal, FakeUniqueJob)
assert Resque.enqueue_to(:unique, FakeUniqueJob)
end

it "should return nil if job already existed" do
Resque.enqueue_to(:normal, FakeUniqueJob)
assert_nil Resque.enqueue_to(:normal, FakeUniqueJob)
Resque.enqueue_to(:unique, FakeUniqueJob)
assert Resque.enqueued?(FakeUniqueJob)
assert_nil Resque.enqueue_to(:unique, FakeUniqueJob)
end

it "should not mark enqueued if another before_enqueue hook fails" do
assert_nil Resque.enqueue_to(:unique, EnqueueFailUniqueJob), "Should not have been actually enqueued"
refute Resque.enqueued?(EnqueueFailUniqueJob), "Should not have been marked enqueued"
end

it "should not mark enqueued if another before_enqueue hook errors" do
assert_raises { Resque.enqueue_to(:unique, EnqueueErrorUniqueJob) }
refute Resque.enqueued?(EnqueueErrorUniqueJob), "Should not have been marked enqueued"
end
end
end
Expand Down
9 changes: 8 additions & 1 deletion test/test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,18 @@
end

require "minitest/autorun"
require "minitest/reporters"
require "resque_solo"
require "fake_jobs"
require "fakeredis"
require "fakeredis/minitest"
begin
require "pry-byebug"
rescue LoadError
# ignore
end

Minitest::Reporters.use! [Minitest::Reporters::DefaultReporter.new({ color: true })]

def perform_one_manually(queue_name)
Resque::Job.reserve(queue_name).perform
end