From 9a02a9add233dc876fea9fc732463112f01b56b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Fri, 26 Apr 2024 10:12:22 +0200 Subject: [PATCH] Replicate x_jms_topic_table Mnesia table (backport #11087) The x_jms_topic_table Mnesia table must be on all nodes for messages to be published to JMS topic exchanges and routed to topic subscribers. The table used to be only in RAM on one node, so it would be unavailable when the node was down and empty when it came back up, losing the state for subscribers still online because connected to other nodes. Inspired by a similar change for the node maintenance status table in #9005. --- deps/rabbit/src/rabbit_maintenance.erl | 49 +++---------------- deps/rabbit/src/rabbit_table.erl | 40 +++++++++++++++ .../src/rabbit_jms_topic_exchange.erl | 23 ++++++--- 3 files changed, 63 insertions(+), 49 deletions(-) diff --git a/deps/rabbit/src/rabbit_maintenance.erl b/deps/rabbit/src/rabbit_maintenance.erl index c071f97a73ee..747d95456e6a 100644 --- a/deps/rabbit/src/rabbit_maintenance.erl +++ b/deps/rabbit/src/rabbit_maintenance.erl @@ -58,47 +58,14 @@ boot() -> rabbit_log:info( "Creating table ~s for maintenance mode status", [TableName]), - try - rabbit_table:create( - TableName, - status_table_definition()), - %% The `rabbit_node_maintenance_states' table used to be global but not - %% replicated. This leads to various errors during RabbitMQ boot or - %% operations on the Mnesia database. The reason is the table existed - %% on a single node and, if that node was stopped or MIA, other nodes - %% may wait forever on that node for the table to be available. - %% - %% The call below makes sure this node has a copy of the table. - case rabbit_table:ensure_table_copy(TableName, node(), ram_copies) of - ok -> - %% Next, we try to fix other nodes in the cluster if they are - %% running a version of RabbitMQ which does not replicate the - %% table. All nodes must have a replica for Mnesia operations - %% to work properly. Therefore the code below is to make older - %% compatible with newer nodes. - Replicas = mnesia:table_info(TableName, all_nodes), - Members = rabbit_nodes:list_running(), - MissingOn = Members -- Replicas, - lists:foreach( - fun(Node) -> - %% Errors from adding a replica on those older nodes - %% are ignored however. They should not be fatal. The - %% problem will solve by itself once all nodes are - %% upgraded. - _ = rpc:call( - Node, - rabbit_table, ensure_table_copy, - [TableName, Node, ram_copies]) - end, MissingOn), - ok; - Error -> - Error - end - catch throw:Reason -> - rabbit_log:error( - "Failed to create maintenance status table: ~p", - [Reason]) - end. + %% The `rabbit_node_maintenance_states' table used to be global but not + %% replicated. This leads to various errors during RabbitMQ boot or + %% operations on the Mnesia database. The reason is the table existed + %% on a single node and, if that node was stopped or MIA, other nodes + %% may wait forever on that node for the table to be available. + rabbit_table:create_and_replicate_table( + TableName, + status_table_definition()). %% %% API diff --git a/deps/rabbit/src/rabbit_table.erl b/deps/rabbit/src/rabbit_table.erl index befe8e5f4a50..5b063e646097 100644 --- a/deps/rabbit/src/rabbit_table.erl +++ b/deps/rabbit/src/rabbit_table.erl @@ -9,6 +9,7 @@ -export([ create/0, create/2, ensure_local_copies/1, ensure_table_copy/3, + create_and_replicate_table/2, create_local_copy/2, wait_for_replicated/1, wait/1, wait/2, force_load/0, is_present/0, is_empty/0, needs_default_data/0, check_schema_integrity/1, clear_ram_only_tables/0, retry_timeout/0, @@ -76,6 +77,7 @@ ensure_secondary_index(Table, Field) -> %% mnesia:table() and mnesia:storage_type() are not exported -type mnesia_table() :: atom(). +-type mnesia_table_definition() :: list(). -type mnesia_storage_type() :: 'ram_copies' | 'disc_copies' | 'disc_only_copies'. -spec ensure_table_copy(mnesia_table(), node(), mnesia_storage_type()) -> @@ -89,6 +91,44 @@ ensure_table_copy(TableName, Node, StorageType) -> {aborted, Reason} -> {error, Reason} end. +-spec create_and_replicate_table(mnesia_table(), mnesia_table_definition()) -> + ok | {error, any()}. +create_and_replicate_table(TableName, TableDefinition) -> + try + rabbit_table:create(TableName, TableDefinition), + %% The call below makes sure this node has a copy of the table. + case rabbit_table:ensure_table_copy(TableName, node(), ram_copies) of + ok -> + %% Next, we try to fix other nodes in the cluster if they are + %% running a version of RabbitMQ which does not replicate the + %% table. All nodes must have a replica for Mnesia operations + %% to work properly. Therefore the code below is to make older + %% compatible with newer nodes. + Replicas = mnesia:table_info(TableName, all_nodes), + Members = rabbit_nodes:list_running(), + MissingOn = Members -- Replicas, + lists:foreach( + fun(Node) -> + %% Errors from adding a replica on those older nodes + %% are ignored however. They should not be fatal. The + %% problem will solve by itself once all nodes are + %% upgraded. + _ = rpc:call( + Node, + rabbit_table, ensure_table_copy, + [TableName, Node, ram_copies]) + end, MissingOn), + ok; + Error -> + Error + end + catch throw:Reason -> + rabbit_log:error( + "Failed to create ~tp table: ~tp", + [TableName, Reason]) + end. + + %% This arity only exists for backwards compatibility with certain %% plugins. See https://github.com/rabbitmq/rabbitmq-clusterer/issues/19. diff --git a/deps/rabbitmq_jms_topic_exchange/src/rabbit_jms_topic_exchange.erl b/deps/rabbitmq_jms_topic_exchange/src/rabbit_jms_topic_exchange.erl index 59c4f59afb03..5203bdf4f8f9 100644 --- a/deps/rabbitmq_jms_topic_exchange/src/rabbit_jms_topic_exchange.erl +++ b/deps/rabbitmq_jms_topic_exchange/src/rabbit_jms_topic_exchange.erl @@ -55,14 +55,21 @@ % Initialise database table for all exchanges of type <<"x-jms-topic">> setup_db_schema() -> - case mnesia:create_table( ?JMS_TOPIC_TABLE - , [ {attributes, record_info(fields, ?JMS_TOPIC_RECORD)} - , {record_name, ?JMS_TOPIC_RECORD} - , {type, set} ] - ) of - {atomic, ok} -> ok; - {aborted, {already_exists, ?JMS_TOPIC_TABLE}} -> ok - end. + TableName = ?JMS_TOPIC_TABLE, + TableDefinition = [{attributes, record_info(fields, ?JMS_TOPIC_RECORD)}, + {record_name, ?JMS_TOPIC_RECORD}, + {type, set}], + rabbit_log:info( + "Creating table ~ts for JMS topic exchange", + [TableName]), + %% The JMS topic exchange table must be available on all nodes. + %% If it existed on only one node, messages could not be published + %% to JMS topic exchanges and routed to topic subscribers if the node + %% was unavailable. + _ = rabbit_table:create_and_replicate_table( + TableName, + TableDefinition), + ok. %%---------------------------------------------------------------------------- %% R E F E R E N C E T Y P E I N F O R M A T I O N