Skip to content

Commit

Permalink
Dynamic queue sharding
Browse files Browse the repository at this point in the history
Use Horde's `process_redistribution` flag to
enable automatic sharding (and resharding) of
processes on cluster events.
This allows for almost seamless autoscaling,
as processes will be terminated and restarted
on different Erlang nodes.

Use the grain-like nature of AMQPDataConsumers
to support spawining (re-)sharded consumers both
on single and multi node Erlang clusters.

Based on #15.

Signed-off-by: Arnaldo Cesco <arnaldo.cesco@secomind.com>
  • Loading branch information
Annopaolo committed Nov 26, 2024
1 parent eae72be commit b45162e
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 17 deletions.
2 changes: 1 addition & 1 deletion lib/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ defmodule Mississippi.Consumer do

channels_per_connection = amqp_consumer_options[:channels]

queue_count = queue_config[:range_end] - queue_config[:range_start] + 1
queue_count = queue_config[:total_count]

# Invariant: we use one channel for one queue.
connection_number = Kernel.ceil(queue_count / channels_per_connection)
Expand Down
30 changes: 20 additions & 10 deletions lib/consumer/amqp_data_consumer_supervisor.ex
Original file line number Diff line number Diff line change
@@ -1,29 +1,39 @@
defmodule Mississippi.Consumer.AMQPDataConsumer.Supervisor do
@moduledoc false
use Supervisor
use Horde.DynamicSupervisor

alias Horde.DynamicSupervisor
alias Mississippi.Consumer.AMQPDataConsumer

def start_link(init_arg) do
Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
DynamicSupervisor.start_link(__MODULE__, init_arg,
name: __MODULE__,
distribution_strategy: Horde.UniformQuorumDistribution
)
end

@impl true
def init(init_arg) do
children =
amqp_data_consumers_childspecs(init_arg[:queues_config])
def init(_init_arg) do
DynamicSupervisor.init(
members: :auto,
strategy: :one_for_one,
process_redistribution: :active
)
end

opts = [strategy: :one_for_one]
def start_consumers(queues_config) do
children = amqp_data_consumers_childspecs(queues_config)

Supervisor.init(children, opts)
Enum.each(children, fn child ->
DynamicSupervisor.start_child(Mississippi.Consumer.AMQPDataConsumer.Supervisor, child)
end)
end

defp amqp_data_consumers_childspecs(queues_config) do
queue_range_start = queues_config[:range_start]
queue_range_end = queues_config[:range_end]
queue_total = queues_config[:total_count]
queue_prefix = queues_config[:prefix]

for queue_index <- queue_range_start..queue_range_end do
for queue_index <- 0..(queue_total - 1) do
queue_name = "#{queue_prefix}#{queue_index}"

init_args = [
Expand Down
18 changes: 14 additions & 4 deletions lib/consumer/consumers_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,25 @@ defmodule Mississippi.Consumer.ConsumersSupervisor do
{Registry, [keys: :unique, name: MessageTracker.Registry, members: :auto]},
{Registry, [keys: :unique, name: AMQPDataConsumer.Registry, members: :auto]},
{DataUpdater.Supervisor, message_handler: message_handler},
{DynamicSupervisor, strategy: :one_for_one, name: MessageTracker.Supervisor, members: :auto},
{AMQPDataConsumer.Supervisor, queues_config: queues_config}
{DynamicSupervisor,
strategy: :one_for_one,
name: MessageTracker.Supervisor,
members: :auto,
process_redistribution: :active,
distribution_strategy: Horde.UniformQuorumDistribution},
{AMQPDataConsumer.Supervisor, queues_config: queues_config},
# This will make queues start after re-sharding in a multi-node cluster
{NodeListener, queues_config},
# This will make queues start in a single-node cluster
{Task, fn -> AMQPDataConsumer.Supervisor.start_consumers(queues_config) end}
]

opts = [strategy: :rest_for_one]

Supervisor.init(children, opts)
end

# TODO find out a suitable set of topologies
defp cluster_topologies do
[]
end
Expand Down Expand Up @@ -68,16 +78,16 @@ defmodule Mississippi.Consumer.ConsumersSupervisor do
],
range_start: [
type: :non_neg_integer,
default: 0,
doc: """
The start index of the range of queues that this Mississippi consumer instance will handle.
This option is deprecated and will be ignored.
"""
],
range_end: [
type: :non_neg_integer,
default: 127,
doc: """
The end index of the range of queues that this Mississippi consumer instance will handle.
This option is deprecated and will be ignored.
"""
],
prefix: [
Expand Down
4 changes: 4 additions & 0 deletions lib/consumer/data_updater.ex
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ defmodule Mississippi.Consumer.DataUpdater do
end
end

def start_link(start_args) do
start_link({:message_handler, Mississippi.Consumer.DataUpdater.Handler.Impl}, start_args)
end

def start_link(extra_args, start_args) do
{:message_handler, message_handler} = extra_args
sharding_key = Keyword.fetch!(start_args, :sharding_key)
Expand Down
13 changes: 11 additions & 2 deletions lib/consumer/data_updater_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,22 @@ defmodule Mississippi.Consumer.DataUpdater.Supervisor do
require Logger

def start_link(init_args) do
DynamicSupervisor.start_link(__MODULE__, init_args, name: __MODULE__)
DynamicSupervisor.start_link(__MODULE__, init_args,
name: __MODULE__,
distribution_strategy: UniformQuorumDistribution
)
end

@impl true
def init(init_args) do
_ = Logger.info("Starting DataUpdater supervisor")
DynamicSupervisor.init(strategy: :one_for_one, members: :auto, extra_arguments: init_args)

DynamicSupervisor.init(
strategy: :one_for_one,
members: :auto,
process_redistribution: :active,
extra_arguments: init_args
)
end

def start_child(child) do
Expand Down
27 changes: 27 additions & 0 deletions lib/consumer/node_listener.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
defmodule NodeListener do
@moduledoc false
use GenServer

alias Mississippi.Consumer.AMQPDataConsumer

require Logger

def start_link(args), do: GenServer.start_link(__MODULE__, args)

def init(queues_config) do
:net_kernel.monitor_nodes(true, node_type: :visible)
{:ok, queues_config}
end

def handle_info({:nodeup, node, node_type}, queues_config) do
_ = Logger.info("Node #{inspect(node)} of type #{inspect(node_type)} is up")
_ = AMQPDataConsumer.Supervisor.start_consumers(queues_config)
{:noreply, queues_config}
end

def handle_info({:nodedown, node, node_type}, queues_config) do
_ = Logger.info("Node #{inspect(node)} of type #{inspect(node_type)} is down")
_ = AMQPDataConsumer.Supervisor.start_consumers(queues_config)
{:noreply, queues_config}
end
end

0 comments on commit b45162e

Please sign in to comment.