-
-
Notifications
You must be signed in to change notification settings - Fork 280
Worker options
shoryuken_options
is only available in Shoryuken workers. It isn't supported in Active Job.
The queue
option associates a queue with a worker.
class HelloWorker
include Shoryuken::Worker
shoryuken_options queue: 'hello'
end
You can also pass a block to define the queue:
shoryuken_options queue: ->{ "#{ENV['RAILS_ENV']}-hello" }
shoryuken_options queue: ->{ "#{Socket.gethostname}-hello" }
Or an array to associate multiple queues to a single worker:
shoryuken_options queue: %w[queue1 queue2 queue3]
When it's enabled, Shoryuken auto deletes messages after their consumption, but only in case the worker doesn't raise any exception during the consumption. Default false
.
As auto_delete
is false
by default, remember to set it to true
or call sqs_msg.delete
before ending the perform
method, otherwise the messages will get back to the queue, becoming available to be consumed again.
Note: auto_delete
is true
by default when using Active Job.
When batch
is true
, Shoryuken sends the messages in batches to the worker instead of individually. Default false
.
One of the advantages of batch
is when you use APIs that accept batch, for example Keen IO.
The BatchableKeenIOWorker
example below is more efficient than the KeenIOWorker
. This efficiency isn't a general rule, it depends on each use case.
class KeenIOWorker
include Shoryuken::Worker
shoryuken_options queue: 'keen_io', auto_delete: true, body_parser: :json
def perform(sqs_msg, event)
Keen.publish 'stats', event
end
end
class BatchableKeenIOWorker
include Shoryuken::Worker
shoryuken_options queue: 'keen_io', auto_delete: true, batch: true, body_parser: :json
def perform(sqs_msgs, events)
Keen.publish_batch 'stats' => events
end
end
If you are using a custom middleware, check this article, the sqs_msg
and body
are arrays when batch=true
.
Another important observation regarding to batchable workers, is if one of your message causes an exception, consequently auto_delete
won't be executed for the ones that succeed, so all of them will become available again or moved to a dead letter queue if you have one configured and it reaches the max attempts.
The body_parser
allows to parse the body before calling the perform
method. It accepts symbols: :json
or :text
or a block returning the body or a class that responds to .parse
. Default :text
.
class MyWorker
include Shoryuken::Worker
shoryuken_options queue: 'default', body_parser: :json
# shoryuken_options queue: 'default', body_parser: ->(sqs_msg){ REXML::Document.new(sqs_msg.body) }
# shoryuken_options queue: 'default', body_parser: JSON
def perform(sqs_msg, hash)
puts hash['name']
sqs_msg.delete
end
end
Although, I strongly recommend to set the visibility timeout to a super pessimistic value, sometimes we can't control it. So, for these cases, you can enable the auto_visibility_timeout
by setting its value to true
. Default false
When it's enabled, 5 seconds before the default visibility timeout expires, Shoryuken will reset it to its original value again.
Be generous while configuring the default visibility_timeout for a queue. If your worker in the worst case takes 2 minutes to consume a message, set the visibility_timeout to at least 4 minutes. It doesn't hurt and it will be better than having the same message being consumed more than the expected. http://www.pablocantero.com/blog/2014/11/29/sqs-to-the-rescue/
Note: This feature isn't supported when using batch=true
.
If a worker raises an exception while consuming a message, by default the message will be available to be consumed again after its visibility timeout expiration.
But if you want to increase or decrease the next time a failing message will be available to be consumed again, you can use retry_intervals
to implement an exponential backoff.
Note: This feature isn't supported when using batch=true
.
shoryuken_options retry_intervals: [300, 1200, 3600] # 5.minutes, 20.minutes and 1.hour
shoryuken_options retry_intervals: ->(attempts) { calculate_next_attempt_interval(attempts) }
Keep in mind that Amazon SQS does not officially support exponential backoff, it's something implemented in Shoryuken using the visibility Timeout, which can extended to a maximum of 12 hours. If your interval exceeds the 12 hour maximum, the middleware will automatically limit it to the maximum.
You can continue to call ChangeMessageVisibility to extend the visibility timeout to a maximum of 12 hours. If you try to extend beyond 12 hours, the request will be rejected.