-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Dynamic queue sharding #21
base: main
Are you sure you want to change the base?
Conversation
6104a34
to
b45162e
Compare
4d96fcb
to
96fac54
Compare
96fac54
to
22c1d49
Compare
22c1d49
to
f1f197c
Compare
def start_link(args) do | ||
Task.start_link(__MODULE__, :start_consumers, [args]) | ||
end | ||
|
||
def start_consumers(args) do | ||
start_consumers(args, 10) | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not so sure here of what's the best naming, if args
, queue_config
, queues_config
or something else
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As it is now, I suggest queues_config
for consistency.
However, I'm not against other namings, as long as it's consistent with other parts of the code (in the supervisor mainly)
@@ -3,30 +3,42 @@ | |||
|
|||
defmodule Mississippi.Consumer.AMQPDataConsumer.Supervisor do | |||
@moduledoc false | |||
use Supervisor | |||
use Horde.DynamicSupervisor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the changes in this module I think AMQPDataConsumer.Supervisor
does not really have a reason to exist anymore as it is now.
I suggest we instead declare the supervisor directly in ConsumersSupervisor
(just like MessageTracker.Supervisor
), then we can have a single module (AMQPDataConsumer.Starter
? I'm not too sold on the naming) to handle the children
queue_prefix = queues_config[:prefix] | ||
|
||
for queue_index <- queue_range_start..queue_range_end do | ||
for queue_index <- 0..(queue_total - 1) do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for queue_index <- 0..(queue_total - 1) do | |
max_index = queue_total - 1 | |
for queue_index <- 0..max_index do |
nit: makes the code easier on the eyes IMO
doc: """ | ||
The start index of the range of queues that this Mississippi consumer instance will handle. | ||
This option is deprecated and will be ignored. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As these options have been deprecated, I think we should also remove them from test/integration/end_to_end.exs:38
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would leave them there until they are removed
AMQPDataConsumer.Supervisor.start_children(args) | ||
|
||
queue_total = args[:total_count] | ||
|
||
child_count = | ||
AMQPDataConsumer.Supervisor |> DynamicSupervisor.which_children() |> Enum.count() | ||
|
||
case child_count do | ||
^queue_total -> | ||
:ok | ||
|
||
_ -> | ||
# TODO: do we want something more refined, e.g. exponential backoff? | ||
Process.sleep(:timer.seconds(2)) | ||
start_consumers(args, retry - 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know it's idempotent, but in my mind you can't possibly have all the children already started right after calling start_children
and this is bound to fail.
How about switching up the order instead to first check and then start if needed?
AMQPDataConsumer.Supervisor.start_children(args) | |
queue_total = args[:total_count] | |
child_count = | |
AMQPDataConsumer.Supervisor |> DynamicSupervisor.which_children() |> Enum.count() | |
case child_count do | |
^queue_total -> | |
:ok | |
_ -> | |
# TODO: do we want something more refined, e.g. exponential backoff? | |
Process.sleep(:timer.seconds(2)) | |
start_consumers(args, retry - 1) | |
queue_total = args[:total_count] | |
child_count = | |
AMQPDataConsumer.Supervisor |> DynamicSupervisor.which_children() |> Enum.count() | |
case child_count do | |
^queue_total -> | |
:ok | |
_ -> | |
AMQPDataConsumer.Supervisor.start_children(args) | |
# TODO: do we want something more refined, e.g. exponential backoff? | |
Process.sleep(:timer.seconds(2)) | |
start_consumers(args, retry - 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Slightly refactored the code; now start_amqp_consumers(queues_config)
contains a (blocking) Enum.each
call that waits until all children are started. Therefore I think this comment is superseded
def start_link(args) do | ||
Task.start_link(__MODULE__, :start_consumers, [args]) | ||
end | ||
|
||
def start_consumers(args) do | ||
start_consumers(args, 10) | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As it is now, I suggest queues_config
for consistency.
However, I'm not against other namings, as long as it's consistent with other parts of the code (in the supervisor mainly)
@@ -0,0 +1,27 @@ | |||
defmodule NodeListener do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- This module is missing the copyright
- If we're going through the burden of having a NodeListener anyway, I think we may as well also set the members of the Horde processes ourselves. This would give us slight better efficiency as we have 1 listener process instead of 6, and we may also have better consistency with cluster membership as there is only one listener process controlling it for all horde processes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After IRL discussion, we decided to keep it as is (copyright aside)
:ok | ||
|
||
_ -> | ||
# TODO: do we want something more refined, e.g. exponential backoff? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Possibly even with some randomness so that we avoid all nodes performing these checks in sync
b03b699
to
4023436
Compare
Use Horde's `process_redistribution` flag to enable automatic sharding (and resharding) of processes on cluster events. This allows for almost seamless autoscaling, as processes will be terminated and restarted on different Erlang nodes. Use the grain-like nature of AMQPDataConsumers to support spawining (re-)sharded consumers both on single and multi node Erlang clusters. Signed-off-by: Arnaldo Cesco <arnaldo.cesco@secomind.com>
4023436
to
89dd452
Compare
Use Horde's
process_redistribution
flag to enable automatic sharding (and resharding) of processes on cluster events, e.g. a new node coming up.This allows for autoscaling almost seamlessy (processes will be terminated and restarted on different Erlang nodes).
Based, based on #24