Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic queue sharding #21

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ defmodule Mississippi.Consumer do

channels_per_connection = amqp_consumer_options[:channels]

queue_count = queue_config[:range_end] - queue_config[:range_start] + 1
queue_count = queue_config[:total_count]

# Invariant: we use one channel for one queue.
connection_number = Kernel.ceil(queue_count / channels_per_connection)
Expand Down
6 changes: 6 additions & 0 deletions lib/consumer/amqp_data_consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,12 @@ defmodule Mississippi.Consumer.AMQPDataConsumer do
{:stop, :normal, state}
end

def handle_info({:EXIT, _from, {:shutdown, :process_redistribution}}, state) do
_ = Logger.info("AMQPDataConsumer shutting down due to process redistribution")
# Exit with :normal so we're not restarted
{:stop, :normal, state}
end

def handle_info({:EXIT, _from, reason}, state) do
{:stop, reason, state}
end
Expand Down
72 changes: 72 additions & 0 deletions lib/consumer/amqp_data_consumer/starter.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# Copyright 2025 SECO Mind Srl
# SPDX-License-Identifier: Apache-2.0

defmodule Mississippi.Consumer.AMQPDataConsumer.Starter do
@moduledoc false
use Task, restart: :transient

alias Horde.DynamicSupervisor
alias Mississippi.Consumer.AMQPDataConsumer

require Logger

@restart_backoff :timer.seconds(2)

def start_link(queues_config) do
Task.start_link(__MODULE__, :start_consumers, [queues_config])
end

def start_consumers(queues_config) do
start_consumers(queues_config, 10)
end

defp start_consumers(_, 0) do
_ = Logger.warning("Cannot start AMQPDataConsumers")
{:error, :cannot_start_consumers}
end

defp start_consumers(queues_config, retry) do
start_amqp_consumers(queues_config)

queue_total = queues_config[:total_count]

child_count =
AMQPDataConsumer.Supervisor |> DynamicSupervisor.which_children() |> Enum.count()

case child_count do
^queue_total ->
:ok

_ ->
# TODO: do we want something more refined, e.g. exponential backoff?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly even with some randomness so that we avoid all nodes performing these checks in sync

backoff_delta = :rand.uniform(@restart_backoff)
Process.sleep(@restart_backoff + backoff_delta)
start_consumers(queues_config, retry - 1)
end
end

def start_amqp_consumers(queues_config) do
children = amqp_data_consumers_childspecs(queues_config)

Enum.each(children, fn child ->
DynamicSupervisor.start_child(AMQPDataConsumer.Supervisor, child)
end)
end

defp amqp_data_consumers_childspecs(queues_config) do
queue_prefix = queues_config[:prefix]
queue_total = queues_config[:total_count]
max_index = queue_total - 1

for queue_index <- 0..max_index do
queue_name = "#{queue_prefix}#{queue_index}"

init_args = [
queue_name: queue_name,
queue_index: queue_index
]

{AMQPDataConsumer, init_args}
end
end
end
40 changes: 0 additions & 40 deletions lib/consumer/amqp_data_consumer_supervisor.ex

This file was deleted.

30 changes: 25 additions & 5 deletions lib/consumer/consumers_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,29 @@ defmodule Mississippi.Consumer.ConsumersSupervisor do
{Registry, [keys: :unique, name: DataUpdater.Registry, members: :auto]},
{Registry, [keys: :unique, name: MessageTracker.Registry, members: :auto]},
{Registry, [keys: :unique, name: AMQPDataConsumer.Registry, members: :auto]},
{DataUpdater.Supervisor, message_handler: message_handler},
{DynamicSupervisor, strategy: :one_for_one, name: MessageTracker.Supervisor, members: :auto},
{AMQPDataConsumer.Supervisor, queues_config: queues_config}
{DynamicSupervisor,
strategy: :one_for_one,
name: DataUpdater.Supervisor,
members: :auto,
process_redistribution: :active,
extra_arguments: [message_handler: message_handler],
distribution_strategy: Horde.UniformQuorumDistribution},
{DynamicSupervisor,
strategy: :one_for_one,
name: MessageTracker.Supervisor,
members: :auto,
process_redistribution: :active,
distribution_strategy: Horde.UniformQuorumDistribution},
{DynamicSupervisor,
strategy: :one_for_one,
name: AMQPDataConsumer.Supervisor,
members: :auto,
process_redistribution: :active,
distribution_strategy: Horde.UniformQuorumDistribution},
# This will make queue listeners start after re-sharding in a multi-node cluster
{NodeListener, queues_config},
# This will make queue listeners start in a single-node cluster
{AMQPDataConsumer.Starter, queues_config}
]

opts = [strategy: :rest_for_one]
Expand Down Expand Up @@ -66,16 +86,16 @@ defmodule Mississippi.Consumer.ConsumersSupervisor do
],
range_start: [
type: :non_neg_integer,
default: 0,
doc: """
The start index of the range of queues that this Mississippi consumer instance will handle.
This option is deprecated and will be ignored.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As these options have been deprecated, I think we should also remove them from test/integration/end_to_end.exs:38

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would leave them there until they are removed

"""
],
range_end: [
type: :non_neg_integer,
default: 127,
doc: """
The end index of the range of queues that this Mississippi consumer instance will handle.
This option is deprecated and will be ignored.
"""
],
prefix: [
Expand Down
10 changes: 9 additions & 1 deletion lib/consumer/data_updater.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ defmodule Mississippi.Consumer.DataUpdater do
use GenServer, restart: :temporary
use Efx

alias Horde.DynamicSupervisor
alias Horde.Registry
alias Mississippi.Consumer.DataUpdater
alias Mississippi.Consumer.DataUpdater.State
Expand Down Expand Up @@ -51,7 +52,10 @@ defmodule Mississippi.Consumer.DataUpdater do
{:ok, pid()} | {:error, :data_updater_start_fail}
defeffect get_data_updater_process(sharding_key) do
# TODO bring back :offload_start (?)
case DataUpdater.Supervisor.start_child({DataUpdater, sharding_key: sharding_key}) do
case DynamicSupervisor.start_child(
DataUpdater.Supervisor,
{DataUpdater, sharding_key: sharding_key}
) do
{:ok, pid} ->
{:ok, pid}

Expand All @@ -71,6 +75,10 @@ defmodule Mississippi.Consumer.DataUpdater do
end
end

def start_link(start_args) do
start_link({:message_handler, Mississippi.Consumer.DataUpdater.Handler.Impl}, start_args)
end

def start_link(extra_args, start_args) do
{:message_handler, message_handler} = extra_args
sharding_key = Keyword.fetch!(start_args, :sharding_key)
Expand Down
32 changes: 0 additions & 32 deletions lib/consumer/data_updater_supervisor.ex

This file was deleted.

30 changes: 30 additions & 0 deletions lib/consumer/node_listener.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Copyright 2025 SECO Mind Srl
# SPDX-License-Identifier: Apache-2.0

defmodule NodeListener do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. This module is missing the copyright
  2. If we're going through the burden of having a NodeListener anyway, I think we may as well also set the members of the Horde processes ourselves. This would give us slight better efficiency as we have 1 listener process instead of 6, and we may also have better consistency with cluster membership as there is only one listener process controlling it for all horde processes

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After IRL discussion, we decided to keep it as is (copyright aside)

@moduledoc false
use GenServer

alias Mississippi.Consumer.AMQPDataConsumer.Starter

require Logger

def start_link(args), do: GenServer.start_link(__MODULE__, args)

def init(queues_config) do
:net_kernel.monitor_nodes(true, node_type: :visible)
{:ok, queues_config}
end

def handle_info({:nodeup, node, node_type}, queues_config) do
_ = Logger.info("Node #{inspect(node)} of type #{inspect(node_type)} is up")
_ = Starter.start_consumers(queues_config)
{:noreply, queues_config}
end

def handle_info({:nodedown, node, node_type}, queues_config) do
_ = Logger.info("Node #{inspect(node)} of type #{inspect(node_type)} is down")
_ = Starter.start_consumers(queues_config)
{:noreply, queues_config}
end
end
8 changes: 7 additions & 1 deletion test/consumer/data_updater_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,13 @@ defmodule Mississippi.Consumer.DataUpdater.Test do
setup_all do
start_supervised!({Registry, [keys: :unique, name: DataUpdater.Registry]})

start_supervised!({DataUpdater.Supervisor, message_handler: MockMessageHandler})
start_supervised(
{DynamicSupervisor,
strategy: :one_for_one,
name: DataUpdater.Supervisor,
members: :auto,
extra_arguments: [message_handler: MockMessageHandler]}
)

:ok
end
Expand Down
Loading