Skip to content

Commit

Permalink
Merge pull request #12208 from rabbitmq/qq-otp27-conf
Browse files Browse the repository at this point in the history
Adjust vheap sizes for message handling processes in OTP 27
  • Loading branch information
michaelklishin authored Oct 11, 2024
2 parents fdc6bd1 + 6a7f8d0 commit 1726064
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 2 deletions.
3 changes: 3 additions & 0 deletions deps/rabbit/app.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ def all_beam_files(name = "all_beam_files"):
"src/rabbit_prelaunch_logging.erl",
"src/rabbit_priority_queue.erl",
"src/rabbit_process.erl",
"src/rabbit_process_flag.erl",
"src/rabbit_queue_consumers.erl",
"src/rabbit_queue_decorator.erl",
"src/rabbit_queue_index.erl",
Expand Down Expand Up @@ -452,6 +453,7 @@ def all_test_beam_files(name = "all_test_beam_files"):
"src/rabbit_prelaunch_logging.erl",
"src/rabbit_priority_queue.erl",
"src/rabbit_process.erl",
"src/rabbit_process_flag.erl",
"src/rabbit_queue_consumers.erl",
"src/rabbit_queue_decorator.erl",
"src/rabbit_queue_index.erl",
Expand Down Expand Up @@ -733,6 +735,7 @@ def all_srcs(name = "all_srcs"):
"src/rabbit_prelaunch_logging.erl",
"src/rabbit_priority_queue.erl",
"src/rabbit_process.erl",
"src/rabbit_process_flag.erl",
"src/rabbit_queue_consumers.erl",
"src/rabbit_queue_decorator.erl",
"src/rabbit_queue_index.erl",
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_amqp_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ConnName,
outgoing_window = ?UINT(RemoteOutgoingWindow),
handle_max = ClientHandleMax}}) ->
process_flag(trap_exit, true),
process_flag(message_queue_data, off_heap),
rabbit_process_flag:adjust_for_message_handling_proc(),

ok = pg:join(pg_scope(), self(), self()),
Alarms0 = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
Expand Down
2 changes: 2 additions & 0 deletions deps/rabbit/src/rabbit_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,8 @@ update_user_state(Pid, UserState) when is_pid(Pid) ->
init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
Capabilities, CollectorPid, LimiterPid, AmqpParams]) ->
process_flag(trap_exit, true),
rabbit_process_flag:adjust_for_message_handling_proc(),

?LG_PROCESS_TYPE(channel),
?store_proc_name({ConnName, Channel}),
ok = pg_local:join(rabbit_channels, self()),
Expand Down
31 changes: 31 additions & 0 deletions deps/rabbit/src/rabbit_process_flag.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%

-module(rabbit_process_flag).


-export([adjust_for_message_handling_proc/0
]).

%% Adjust process flags for processes that handle RabbitMQ messages.
%% For example any process that uses the `rabbit_queue_type' module
%% may benefit from this tuning.
%% @returns `ok'
-spec adjust_for_message_handling_proc() -> ok.
adjust_for_message_handling_proc() ->
process_flag(message_queue_data, off_heap),
case code_version:get_otp_version() of
OtpMaj when OtpMaj >= 27 ->
%% 46422 is the default min_bin_vheap_size and for OTP 27 and above
%% we want to substantially increase it for processes that may buffer
%% messages. 32x has proven workable in testing whilst not being
%% ridiculously large
process_flag(min_bin_vheap_size, 46422 * 32),
ok;
_ ->
ok
end.
13 changes: 12 additions & 1 deletion deps/rabbit/src/rabbit_ra_systems.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
-define(COORD_WAL_MAX_SIZE_B, 64_000_000).
-define(QUORUM_AER_MAX_RPC_SIZE, 16).
-define(QUORUM_DEFAULT_WAL_MAX_ENTRIES, 500_000).
%% the default min bin vheap value in OTP 26
-define(MIN_BIN_VHEAP_SIZE_DEFAULT, 46422).
-define(MIN_BIN_VHEAP_SIZE_MULT, 64).

-spec setup() -> ok | no_return().

Expand Down Expand Up @@ -107,7 +110,6 @@ ensure_ra_system_started(RaSystem) ->
end.

-spec get_config(ra_system_name()) -> ra_system:config().

get_config(quorum_queues = RaSystem) ->
DefaultConfig = get_default_config(),
Checksums = application:get_env(rabbit, quorum_compute_checksums, true),
Expand All @@ -124,7 +126,16 @@ get_config(quorum_queues = RaSystem) ->
AERBatchSize = application:get_env(rabbit, quorum_max_append_entries_rpc_batch_size,
?QUORUM_AER_MAX_RPC_SIZE),
CompressMemTables = application:get_env(rabbit, quorum_compress_mem_tables, true),
MinBinVheapSize = case code_version:get_otp_version() of
OtpMaj when OtpMaj >= 27 ->
?MIN_BIN_VHEAP_SIZE_DEFAULT * ?MIN_BIN_VHEAP_SIZE_MULT;
_ ->
?MIN_BIN_VHEAP_SIZE_DEFAULT
end,

DefaultConfig#{name => RaSystem,
wal_min_bin_vheap_size => MinBinVheapSize,
server_min_bin_vheap_size => MinBinVheapSize,
default_max_append_entries_rpc_batch_size => AERBatchSize,
wal_compute_checksums => WalChecksums,
wal_max_entries => WalMaxEntries,
Expand Down
1 change: 1 addition & 0 deletions deps/rabbit_common/src/code_version.erl
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ get_forms(Code) ->
throw({no_abstract_code, Reason})
end.

-spec get_otp_version() -> non_neg_integer().
get_otp_version() ->
Version = erlang:system_info(otp_release),
case re:run(Version, "^[0-9][0-9]", [{capture, first, list}]) of
Expand Down
1 change: 1 addition & 0 deletions moduleindex.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,7 @@ rabbit:
- rabbit_prelaunch_logging
- rabbit_priority_queue
- rabbit_process
- rabbit_process_flag
- rabbit_queue_consumers
- rabbit_queue_decorator
- rabbit_queue_index
Expand Down

0 comments on commit 1726064

Please sign in to comment.