Skip to content

Commit

Permalink
Khepri: clustering
Browse files Browse the repository at this point in the history
  • Loading branch information
dcorbacho committed Mar 27, 2023
1 parent 2a7d355 commit 0f152cb
Show file tree
Hide file tree
Showing 13 changed files with 931 additions and 156 deletions.
13 changes: 7 additions & 6 deletions deps/rabbit/src/rabbit.erl
Original file line number Diff line number Diff line change
Expand Up @@ -356,19 +356,20 @@ run_prelaunch_second_phase() ->
%% 3. Logging.
ok = rabbit_prelaunch_logging:setup(Context),

%% The clustering steps requires Khepri to be started to check for consistency
ok = rabbit_ra_systems:setup(Context),

%% Khepri requires the "coordination" Ra system to be started by the
%% previous call, but will ensure it runs anyway.
ok = rabbit_khepri:setup(Context),

%% 4. Clustering.
ok = rabbit_prelaunch_cluster:setup(Context),

%% Start Mnesia now that everything is ready.
?LOG_DEBUG("Starting Mnesia"),
ok = mnesia:start(),

ok = rabbit_ra_systems:setup(Context),

%% Khepri requires the "coordination" Ra system to be started by the
%% previous call, but will ensure it runs anyway.
ok = rabbit_khepri:setup(Context),

?LOG_DEBUG(""),
?LOG_DEBUG("== Prelaunch DONE =="),

Expand Down
3 changes: 3 additions & 0 deletions deps/rabbit/src/rabbit_amqqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1720,6 +1720,9 @@ forget_all_durable(Node) ->
forget_node_for_queue(_DeadNode, Q)
when ?amqqueue_is_quorum(Q) ->
ok;
forget_node_for_queue(_DeadNode, Q)
when ?amqqueue_is_stream(Q) ->
ok;
forget_node_for_queue(DeadNode, Q) ->
RS = amqqueue:get_recoverable_slaves(Q),
forget_node_for_queue(DeadNode, RS, Q).
Expand Down
4 changes: 0 additions & 4 deletions deps/rabbit/src/rabbit_channel_tracking_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,6 @@ handle_event(#event{type = connection_closed, props = Details}, State) ->
handle_event(#event{type = user_deleted, props = Details}, State) ->
ok = rabbit_channel_tracking:update_tracked({user_deleted, Details}),
{ok, State};
%% A node had been deleted from the cluster.
handle_event(#event{type = node_deleted, props = Details}, State) ->
ok = rabbit_channel_tracking:update_tracked({node_deleted, Details}),
{ok, State};
handle_event(_Event, State) ->
{ok, State}.

Expand Down
12 changes: 10 additions & 2 deletions deps/rabbit/src/rabbit_core_ff.erl
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,16 @@ mds_phase1_migration_enable(#{feature_name := FeatureName}) ->
Tables = ?MDS_PHASE1_TABLES,
global:set_lock({FeatureName, self()}),
Ret = case rabbit_khepri:is_ready() of
true -> ok;
false -> mds_migration_enable(FeatureName, Tables)
true ->
ok;
false ->
ClusterNodes = rabbit_mnesia:cluster_nodes(all),
case rabbit_khepri:init_cluster(ClusterNodes) of
ok ->
mds_migration_enable(FeatureName, Tables);
{error, Reason} ->
{error, {migration_failure, Reason}}
end
end,
global:del_lock({FeatureName, self()}),
Ret.
Expand Down
52 changes: 45 additions & 7 deletions deps/rabbit/src/rabbit_db.erl
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@
init() ->
rabbit_db:run(
#{mnesia => fun() -> init_in_mnesia() end,
khepri => fun() -> init_in_khepri() end
khepri => fun() ->
init_in_khepri(),
init_in_mnesia()
end
}).

init_in_mnesia() ->
Expand Down Expand Up @@ -82,27 +85,53 @@ init_using_mnesia() ->
%% @doc Resets the database and the node.

reset() ->
reset_using_mnesia().
rabbit_log:info("Resetting Rabbit", []),
run(
#{mnesia => fun() -> reset_using_mnesia() end,
khepri => fun() ->
case reset_using_khepri() of
ok -> reset_using_mnesia();
Error -> Error
end
end
}).

reset_using_mnesia() ->
?LOG_DEBUG(
"DB: resetting node",
#{domain => ?RMQLOG_DOMAIN_DB}),
rabbit_mnesia:reset().

reset_using_khepri() ->
?LOG_DEBUG(
"DB: resetting node",
#{domain => ?RMQLOG_DOMAIN_DB}),
rabbit_khepri:reset().

-spec force_reset() -> Ret when
Ret :: ok.
%% @doc Resets the database and the node.

force_reset() ->
force_reset_using_mnesia().
?LOG_DEBUG(
"DB: resetting node forcefully",
#{domain => ?RMQLOG_DOMAIN_DB}),
run(
#{mnesia => fun() -> force_reset_using_mnesia() end,
khepri => fun() ->
case force_reset_using_khepri() of
ok -> force_reset_using_mnesia();
Error -> Error
end
end
}).

force_reset_using_mnesia() ->
?LOG_DEBUG(
"DB: resetting node forcefully",
#{domain => ?RMQLOG_DOMAIN_DB}),
rabbit_mnesia:force_reset().

force_reset_using_khepri() ->
rabbit_khepri:force_reset().

-spec force_load_on_next_boot() -> Ret when
Ret :: ok.
%% @doc Requests that the database to be forcefully loaded during next boot.
Expand All @@ -111,7 +140,16 @@ force_reset_using_mnesia() ->
%% state, like if critical members are MIA.

force_load_on_next_boot() ->
force_load_on_next_boot_using_mnesia().
run(
#{mnesia => fun() -> force_load_on_next_boot_using_mnesia() end,
khepri => fun() ->
%% TODO force load using Khepri might need to be implemented
%% for disaster recovery scenarios where just a minority of
%% nodes are accessible. Potentially, it could also be replaced
%% with a way to export all the data.
force_load_on_next_boot_using_mnesia()
end
}).

force_load_on_next_boot_using_mnesia() ->
?LOG_DEBUG(
Expand Down
101 changes: 92 additions & 9 deletions deps/rabbit/src/rabbit_db_cluster.erl
Original file line number Diff line number Diff line change
Expand Up @@ -47,23 +47,68 @@ join(RemoteNode, NodeType)
when is_atom(RemoteNode) andalso ?IS_NODE_TYPE(NodeType) ->
?LOG_DEBUG(
"DB: joining cluster using remote node `~ts`", [RemoteNode],
#{domain => ?RMQLOG_DOMAIN_DB}),
join_using_mnesia(RemoteNode, NodeType).
#{domain => ?RMQLOG_DOMAIN_DB}),
case rabbit_db:run(
#{mnesia => fun() -> join_using_mnesia(RemoteNode, NodeType) end,
khepri => fun() -> join_using_khepri(RemoteNode, NodeType) end
}) of
ok ->
rabbit_node_monitor:notify_joined_cluster(),
ok;
Other ->
Other
end.

join_using_mnesia(RemoteNode, NodeType) ->
rabbit_mnesia:join_cluster(RemoteNode, NodeType).

join_using_khepri(_RemoteNode, ram) ->
rabbit_log:warning("Join node with --ram flag is not supported by Khepri. Skipping..."),
{error, not_supported};
join_using_khepri(RemoteNode, NodeType) ->
case rabbit_khepri:check_join_cluster(RemoteNode) of
ok ->
case join_using_mnesia(RemoteNode, NodeType) of
ok ->
rabbit_khepri:join_cluster(RemoteNode);
{ok, already_member} ->
rabbit_khepri:join_cluster(RemoteNode);
Error ->
Error
end;
{ok, already_member} ->
join_using_mnesia(RemoteNode, NodeType);
Error ->
Error
end.

-spec forget_member(Node, RemoveWhenOffline) -> ok when
Node :: node(),
RemoveWhenOffline :: boolean().
%% @doc Removes `Node' from the cluster.

forget_member(Node, RemoveWhenOffline) ->
forget_member_using_mnesia(Node, RemoveWhenOffline).
rabbit_db:run(
#{mnesia => fun() -> forget_member_using_mnesia(Node, RemoveWhenOffline) end,
khepri => fun() ->
case forget_member_using_khepri(Node, RemoveWhenOffline) of
ok ->
forget_member_using_mnesia(Node, RemoveWhenOffline);
Error ->
Error
end
end
}).

forget_member_using_mnesia(Node, RemoveWhenOffline) ->
rabbit_mnesia:forget_cluster_node(Node, RemoveWhenOffline).

forget_member_using_khepri(_Node, true) ->
rabbit_log:warning("Remove node with --offline flag is not supported by Khepri. Skipping..."),
{error, not_supported};
forget_member_using_khepri(Node, false = _RemoveWhenOffline) ->
rabbit_khepri:leave_cluster(Node).

%% -------------------------------------------------------------------
%% Cluster update.
%% -------------------------------------------------------------------
Expand All @@ -75,11 +120,18 @@ forget_member_using_mnesia(Node, RemoveWhenOffline) ->
%% Node types may not all be valid with all databases.

change_node_type(NodeType) ->
change_node_type_using_mnesia(NodeType).
rabbit_db:run(
#{mnesia => fun() -> change_node_type_using_mnesia(NodeType) end,
khepri => fun() -> change_node_type_using_khepri(NodeType) end
}).

change_node_type_using_mnesia(NodeType) ->
rabbit_mnesia:change_cluster_node_type(NodeType).

change_node_type_using_khepri(_NodeType) ->
rabbit_log:warning("Change cluster node type is not supported by Khepri. Only disc nodes are allowed. Skipping..."),
{error, not_supported}.

%% -------------------------------------------------------------------
%% Cluster status.
%% -------------------------------------------------------------------
Expand All @@ -89,17 +141,26 @@ change_node_type_using_mnesia(NodeType) ->
%% @doc Indicates if this node is clustered with other nodes or not.

is_clustered() ->
is_clustered_using_mnesia().
rabbit_db:run(
#{mnesia => fun() -> is_clustered_using_mnesia() end,
khepri => fun() -> is_clustered_using_khepri() end
}).

is_clustered_using_mnesia() ->
rabbit_mnesia:is_clustered().

is_clustered_using_khepri() ->
rabbit_khepri:is_clustered().

-spec members() -> Members when
Members :: [node()].
%% @doc Returns the list of cluster members.

members() ->
members_using_mnesia().
rabbit_db:run(
#{mnesia => fun() -> members_using_mnesia() end,
khepri => fun() -> members_using_khepri() end
}).

members_using_mnesia() ->
case rabbit_mnesia:is_running() andalso rabbit_table:is_present() of
Expand All @@ -122,12 +183,18 @@ members_using_mnesia() ->
end
end.

members_using_khepri() ->
rabbit_khepri:nodes().

-spec disc_members() -> Members when
Members :: [node()].
%% @private

disc_members() ->
disc_members_using_mnesia().
rabbit_db:run(
#{mnesia => fun() -> disc_members_using_mnesia() end,
khepri => fun() -> members_using_khepri() end
}).

disc_members_using_mnesia() ->
rabbit_mnesia:cluster_nodes(disc).
Expand All @@ -139,20 +206,36 @@ disc_members_using_mnesia() ->
%% Node types may not all be relevant with all databases.

node_type() ->
node_type_using_mnesia().
rabbit_db:run(
#{mnesia => fun() -> node_type_using_mnesia() end,
khepri => fun() -> node_type_using_khepri() end
}).

node_type_using_mnesia() ->
rabbit_mnesia:node_type().

node_type_using_khepri() ->
disc.

-spec check_consistency() -> ok.
%% @doc Ensures the cluster is consistent.

check_consistency() ->
check_consistency_using_mnesia().
rabbit_db:run(
#{mnesia => fun() -> check_consistency_using_mnesia() end,
khepri => fun() -> case check_consistency_using_khepri() of
ok -> check_consistency_using_mnesia();
Error -> Error
end
end
}).

check_consistency_using_mnesia() ->
rabbit_mnesia:check_cluster_consistency().

check_consistency_using_khepri() ->
rabbit_khepri:check_cluster_consistency().

-spec cli_cluster_status() -> ClusterStatus when
ClusterStatus :: [{nodes, [{rabbit_db_cluster:node_type(), [node()]}]} |
{running_nodes, [node()]} |
Expand Down
19 changes: 15 additions & 4 deletions deps/rabbit/src/rabbit_db_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -358,14 +358,17 @@ delete_in_mnesia(QueueName, Reason) ->
end).

delete_in_khepri(QueueName) ->
delete_in_khepri(QueueName, false).

delete_in_khepri(QueueName, OnlyDurable) ->
rabbit_khepri:transaction(
fun () ->
Path = khepri_queue_path(QueueName),
case khepri_tx_adv:delete(Path) of
{ok, #{data := _}} ->
%% we want to execute some things, as decided by rabbit_exchange,
%% after the transaction.
rabbit_db_binding:delete_for_destination_in_khepri(QueueName, false);
rabbit_db_binding:delete_for_destination_in_khepri(QueueName, OnlyDurable);
{ok, _} ->
ok
end
Expand All @@ -387,7 +390,7 @@ internal_delete(QueueName, OnlyDurable, Reason) ->
%% HA queues are removed it can be removed.
rabbit_db:run(
#{mnesia => fun() -> internal_delete_in_mnesia(QueueName, OnlyDurable, Reason) end,
khepri => fun() -> ok end
khepri => fun() -> delete_in_khepri(QueueName, OnlyDurable) end
}).

internal_delete_in_mnesia(QueueName, OnlyDurable, Reason) ->
Expand Down Expand Up @@ -970,7 +973,7 @@ foreach_transient_in_mnesia(UpdateFun) ->
foreach_durable(UpdateFun, FilterFun) ->
rabbit_db:run(
#{mnesia => fun() -> foreach_durable_in_mnesia(UpdateFun, FilterFun) end,
khepri => fun() -> ok end
khepri => fun() -> foreach_durable_in_khepri(UpdateFun, FilterFun) end
}).

foreach_durable_in_mnesia(UpdateFun, FilterFun) ->
Expand All @@ -984,7 +987,15 @@ foreach_durable_in_mnesia(UpdateFun, FilterFun) ->
_ = [UpdateFun(Q) || Q <- Qs, FilterFun(Q)],
ok
end),
ok.
ok.

foreach_durable_in_khepri(UpdateFun, FilterFun) ->
Path = khepri_queues_path() ++ [rabbit_khepri:if_has_data_wildcard()],
{ok, Qs} = rabbit_khepri:filter(Path, fun(_, #{data := Q}) ->
FilterFun(Q)
end),
_ = [UpdateFun(Q) || Q <- maps:values(Qs)],
ok.

%% -------------------------------------------------------------------
%% set_dirty().
Expand Down
Loading

0 comments on commit 0f152cb

Please sign in to comment.