-
-
Notifications
You must be signed in to change notification settings - Fork 280
Worker options
The queue
option associates a queue with a worker.
class MyWorker
include Shoryuken::Worker
shoryuken_options queue: 'my_queue'
def perform(sqs_msg, body); end
end
You can also pass a block to define the queue:
class MyWorker
include Shoryuken::Worker
shoryuken_options queue: ->{ "#{ENV['APPLICATION_ENV']}_my_queue" }
# shoryuken_options queue: ->{ "#{Socket.gethostname}_my_queue" }
def perform(sqs_msg, body); end
end
Or an array to associate multiple queues to a single worker:
class MyWorker
include Shoryuken::Worker
shoryuken_options queue: %w[queue1 queue2 queue3]
# shoryuken_options queue: ->{ load_queue_names # a method that returns an array }
def perform(sqs_msg, body); end
end
When auto_delete
is true
, Shoryuken deletes the messages after their consumption, but only in case the worker doesn't raise any exception during the consumption. Default false
.
As auto_delete
is false
by default, remember to set it to true
or call sqs_msg.delete
before ending the perform
method, otherwise the messages will get back to the queue, becoming available to be consumed again.
Note: auto_delete
is true
by default when using Active Job.
When batch
is true
, Shoryuken sends the messages in batches to the worker instead of individually. Default false
.
One of the advantages of batch
is when you use APIs that accept batch, for example Keen IO.
The BatchableKeenIOWorker
example below is more efficient than the KeenIOWorker
. This efficiency isn't a general rule, it depends on each use case.
class KeenIOWorker
include Shoryuken::Worker
shoryuken_options queue: 'keen_io', auto_delete: true, body_parser: :json
def perform(sqs_msg, event)
Keen.publish 'stats', event
end
end
class BatchableKeenIOWorker
include Shoryuken::Worker
shoryuken_options queue: 'keen_io', auto_delete: true, batch: true, body_parser: :json
def perform(sqs_msgs, events)
Keen.publish_batch 'stats' => events
end
end
If you are using custom middleware, check this article, the sqs_msg
and body
are arrays when batch=true
.
Another observation regarding to batchable workers, is if one of your message causes an exception, consequently auto_delete
won't be executed for the ones that succeed, so all of them will become available again or moved to a dead letter queue if you have one configured and it reaches the max attempts.
The body_parser
allows to parse the body before calling the perform
method. It accepts symbols: :json
or :text
or a block returning the body or a class that responds to .parse
. Default :text
.
class MyWorker
include Shoryuken::Worker
shoryuken_options queue: 'default', body_parser: :json
# shoryuken_options queue: 'default', body_parser: ->(sqs_msg){ REXML::Document.new(sqs_msg.body) }
# shoryuken_options queue: 'default', body_parser: JSON
def perform(sqs_msg, hash)
puts hash['name']
sqs_msg.delete
end
end
Although I strongly recommend to set the visibility timeout to a super pessimistic value, sometimes we can't control it. So, for these cases, you can enable the auto_visibility_timeout
by setting its value to true
. Default false
When it's enabled, 5 seconds before the default visibility timeout expires, Shoryuken will reset it to its original value again.
Be generous while configuring the default visibility_timeout for a queue. If your worker in the worst case takes 2 minutes to consume a message, set the visibility_timeout to at least 4 minutes. It doesn't hurt and it will be better than having the same message being consumed more than the expected. http://www.pablocantero.com/blog/2014/11/29/sqs-to-the-rescue/
If a job raises an exception while consuming a message, by default the message will be available to be consumed again after its expiration (visibility) timeout.
But if you want to increase or decrease the next time a failing message will be available to be consumed again, you can use retry_intervals
to implement an exponential backoff.
class MyWorker
include Shoryuken::Worker
shoryuken_options queue: 'default', retry_intervals: [360, 1200, 3600] # 5.minutes, 20.minutes and 1.hour
def perform(sqs_msg, body); end
end
Keep in mind that AWS SQS does not officially support exponential backoff, it's something implemented in Shoryuken using the Visibility Timeout, which can extended to a maximum of 12 hours.
You can continue to call ChangeMessageVisibility to extend the visibility timeout to a maximum of 12 hours. If you try to extend beyond 12 hours, the request will be rejected.