Skip to content

Commit

Permalink
Dynamic queue sharding
Browse files Browse the repository at this point in the history
Use Horde's `process_redistribution` flag to enable automatic
sharding (and resharding) of processes on cluster events.
This allows for almost seamless autoscaling, as processes
will be terminated and restarted on different Erlang nodes.

Use the grain-like nature of AMQPDataConsumers to support
spawining (re-)sharded consumers both on single and
multi node Erlang clusters.

Signed-off-by: Arnaldo Cesco <arnaldo.cesco@secomind.com>
  • Loading branch information
Annopaolo committed Jan 14, 2025
1 parent e9f79a4 commit 22c1d49
Show file tree
Hide file tree
Showing 8 changed files with 128 additions and 17 deletions.
2 changes: 1 addition & 1 deletion lib/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ defmodule Mississippi.Consumer do

channels_per_connection = amqp_consumer_options[:channels]

queue_count = queue_config[:range_end] - queue_config[:range_start] + 1
queue_count = queue_config[:total_count]

# Invariant: we use one channel for one queue.
connection_number = Kernel.ceil(queue_count / channels_per_connection)
Expand Down
6 changes: 6 additions & 0 deletions lib/consumer/amqp_data_consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,12 @@ defmodule Mississippi.Consumer.AMQPDataConsumer do
{:stop, :normal, state}
end

def handle_info({:EXIT, _from, {:shutdown, :process_redistribution}}, state) do
_ = Logger.info("AMQPDataConsumer shutting down due to process redistribution")
# Exit with :normal so we're not restarted
{:stop, :normal, state}
end

def handle_info({:EXIT, _from, reason}, state) do
{:stop, reason, state}
end
Expand Down
19 changes: 19 additions & 0 deletions lib/consumer/amqp_data_consumer/starter.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Copyright 2024 SECO Mind Srl
# SPDX-License-Identifier: Apache-2.0

defmodule Mississippi.Consumer.AMQPDataConsumer.Starter do
@moduledoc false
use Task, restart: :transient

alias Mississippi.Consumer.AMQPDataConsumer

require Logger

def start_link(args) do
Task.start_link(__MODULE__, :run, [args])
end

def run(args) do
AMQPDataConsumer.Supervisor.start_consumers(args)
end
end
57 changes: 47 additions & 10 deletions lib/consumer/amqp_data_consumer_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,67 @@

defmodule Mississippi.Consumer.AMQPDataConsumer.Supervisor do
@moduledoc false
use Supervisor
use Horde.DynamicSupervisor

alias Horde.DynamicSupervisor
alias Mississippi.Consumer.AMQPDataConsumer

require Logger

def start_link(init_arg) do
Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
DynamicSupervisor.start_link(__MODULE__, init_arg,
name: __MODULE__,
distribution_strategy: Horde.UniformQuorumDistribution
)
end

@impl true
def init(init_arg) do
children =
amqp_data_consumers_childspecs(init_arg[:queues_config])
def init(_init_arg) do
DynamicSupervisor.init(
members: :auto,
strategy: :one_for_one,
process_redistribution: :active
)
end

def start_consumers(queues_config) do
start_consumers(queues_config, 10)
end

defp start_consumers(_, 0) do
_ = Logger.warning("Cannot start AMQPDataConsumers")
{:error, :cannot_start_consumers}
end

defp start_consumers(queues_config, retry) do
queue_total = queues_config[:total_count]
children_count = __MODULE__ |> DynamicSupervisor.which_children() |> Enum.count()

case children_count do
^queue_total ->
:ok

_ ->
start_children(queues_config)
# TODO: do we want something more refined, e.g. exponential backoff?
Process.sleep(:timer.seconds(2))
start_consumers(queues_config, retry - 1)
end
end

opts = [strategy: :one_for_one]
defp start_children(queues_config) do
children = amqp_data_consumers_childspecs(queues_config)

Supervisor.init(children, opts)
Enum.each(children, fn child ->
DynamicSupervisor.start_child(Mississippi.Consumer.AMQPDataConsumer.Supervisor, child)
end)
end

defp amqp_data_consumers_childspecs(queues_config) do
queue_range_start = queues_config[:range_start]
queue_range_end = queues_config[:range_end]
queue_total = queues_config[:total_count]
queue_prefix = queues_config[:prefix]

for queue_index <- queue_range_start..queue_range_end do
for queue_index <- 0..(queue_total - 1) do
queue_name = "#{queue_prefix}#{queue_index}"

init_args = [
Expand Down
17 changes: 13 additions & 4 deletions lib/consumer/consumers_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,17 @@ defmodule Mississippi.Consumer.ConsumersSupervisor do
{Registry, [keys: :unique, name: MessageTracker.Registry, members: :auto]},
{Registry, [keys: :unique, name: AMQPDataConsumer.Registry, members: :auto]},
{DataUpdater.Supervisor, message_handler: message_handler},
{DynamicSupervisor, strategy: :one_for_one, name: MessageTracker.Supervisor, members: :auto},
{AMQPDataConsumer.Supervisor, queues_config: queues_config}
{DynamicSupervisor,
strategy: :one_for_one,
name: MessageTracker.Supervisor,
members: :auto,
process_redistribution: :active,
distribution_strategy: Horde.UniformQuorumDistribution},
{AMQPDataConsumer.Supervisor, queues_config: queues_config},
# This will make queue listeners start after re-sharding in a multi-node cluster
{NodeListener, queues_config},
# This will make queue listeners start in a single-node cluster
{AMQPDataConsumer.Starter, queues_config}
]

opts = [strategy: :rest_for_one]
Expand Down Expand Up @@ -66,16 +75,16 @@ defmodule Mississippi.Consumer.ConsumersSupervisor do
],
range_start: [
type: :non_neg_integer,
default: 0,
doc: """
The start index of the range of queues that this Mississippi consumer instance will handle.
This option is deprecated and will be ignored.
"""
],
range_end: [
type: :non_neg_integer,
default: 127,
doc: """
The end index of the range of queues that this Mississippi consumer instance will handle.
This option is deprecated and will be ignored.
"""
],
prefix: [
Expand Down
4 changes: 4 additions & 0 deletions lib/consumer/data_updater.ex
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ defmodule Mississippi.Consumer.DataUpdater do
end
end

def start_link(start_args) do
start_link({:message_handler, Mississippi.Consumer.DataUpdater.Handler.Impl}, start_args)
end

def start_link(extra_args, start_args) do
{:message_handler, message_handler} = extra_args
sharding_key = Keyword.fetch!(start_args, :sharding_key)
Expand Down
13 changes: 11 additions & 2 deletions lib/consumer/data_updater_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,22 @@ defmodule Mississippi.Consumer.DataUpdater.Supervisor do
require Logger

def start_link(init_args) do
DynamicSupervisor.start_link(__MODULE__, init_args, name: __MODULE__)
DynamicSupervisor.start_link(__MODULE__, init_args,
name: __MODULE__,
distribution_strategy: UniformQuorumDistribution
)
end

@impl true
def init(init_args) do
_ = Logger.info("Starting DataUpdater supervisor")
DynamicSupervisor.init(strategy: :one_for_one, members: :auto, extra_arguments: init_args)

DynamicSupervisor.init(
strategy: :one_for_one,
members: :auto,
process_redistribution: :active,
extra_arguments: init_args
)
end

def start_child(child) do
Expand Down
27 changes: 27 additions & 0 deletions lib/consumer/node_listener.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
defmodule NodeListener do
@moduledoc false
use GenServer

alias Mississippi.Consumer.AMQPDataConsumer

require Logger

def start_link(args), do: GenServer.start_link(__MODULE__, args)

def init(queues_config) do
:net_kernel.monitor_nodes(true, node_type: :visible)
{:ok, queues_config}
end

def handle_info({:nodeup, node, node_type}, queues_config) do
_ = Logger.info("Node #{inspect(node)} of type #{inspect(node_type)} is up")
_ = AMQPDataConsumer.Supervisor.start_consumers(queues_config)
{:noreply, queues_config}
end

def handle_info({:nodedown, node, node_type}, queues_config) do
_ = Logger.info("Node #{inspect(node)} of type #{inspect(node_type)} is down")
_ = AMQPDataConsumer.Supervisor.start_consumers(queues_config)
{:noreply, queues_config}
end
end

0 comments on commit 22c1d49

Please sign in to comment.