From 465b19e8e80362e7462222d6a20a65dbe0f4d386 Mon Sep 17 00:00:00 2001
From: Karl Nilsson <kjnilsson@gmail.com>
Date: Wed, 4 Sep 2024 12:29:22 +0100
Subject: [PATCH 1/2] Adjust vheap sizes for message handling processes in OTP
 27

OTP 27 reset all assumptions on how the vm reacts to processes that
buffer and process a lot of large binaries.

Substantially increasing the vheap sizes for such process restores
most of the same performance by allowing processes to hold more binary
data before major garbage collections are triggered.

This introduces a new module to capture process flag configurations.

The new vheap sizes are only applied when running on OTP 27 or
above.
---
 deps/rabbit/app.bzl                     |  3 +++
 deps/rabbit/src/rabbit_amqp_session.erl |  2 +-
 deps/rabbit/src/rabbit_channel.erl      | 15 ++++++++++++
 deps/rabbit/src/rabbit_process_flag.erl | 32 +++++++++++++++++++++++++
 deps/rabbit/src/rabbit_ra_systems.erl   | 13 +++++++++-
 deps/rabbit_common/src/code_version.erl |  1 +
 moduleindex.yaml                        |  1 +
 7 files changed, 65 insertions(+), 2 deletions(-)
 create mode 100644 deps/rabbit/src/rabbit_process_flag.erl

diff --git a/deps/rabbit/app.bzl b/deps/rabbit/app.bzl
index cf5a2d1769b7..6b8702847043 100644
--- a/deps/rabbit/app.bzl
+++ b/deps/rabbit/app.bzl
@@ -194,6 +194,7 @@ def all_beam_files(name = "all_beam_files"):
             "src/rabbit_prelaunch_logging.erl",
             "src/rabbit_priority_queue.erl",
             "src/rabbit_process.erl",
+            "src/rabbit_process_flag.erl",
             "src/rabbit_queue_consumers.erl",
             "src/rabbit_queue_decorator.erl",
             "src/rabbit_queue_index.erl",
@@ -452,6 +453,7 @@ def all_test_beam_files(name = "all_test_beam_files"):
             "src/rabbit_prelaunch_logging.erl",
             "src/rabbit_priority_queue.erl",
             "src/rabbit_process.erl",
+            "src/rabbit_process_flag.erl",
             "src/rabbit_queue_consumers.erl",
             "src/rabbit_queue_decorator.erl",
             "src/rabbit_queue_index.erl",
@@ -733,6 +735,7 @@ def all_srcs(name = "all_srcs"):
             "src/rabbit_prelaunch_logging.erl",
             "src/rabbit_priority_queue.erl",
             "src/rabbit_process.erl",
+            "src/rabbit_process_flag.erl",
             "src/rabbit_queue_consumers.erl",
             "src/rabbit_queue_decorator.erl",
             "src/rabbit_queue_index.erl",
diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl
index 3be9ea2b00fc..e95de1ca9fe0 100644
--- a/deps/rabbit/src/rabbit_amqp_session.erl
+++ b/deps/rabbit/src/rabbit_amqp_session.erl
@@ -391,7 +391,7 @@ init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ConnName,
          outgoing_window = ?UINT(RemoteOutgoingWindow),
          handle_max = ClientHandleMax}}) ->
     process_flag(trap_exit, true),
-    process_flag(message_queue_data, off_heap),
+    rabbit_process_flag:adjust_for_message_handling_proc(),
 
     ok = pg:join(pg_scope(), self(), self()),
     Alarms0 = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl
index 7eee4f0c81d4..f6d3657147f8 100644
--- a/deps/rabbit/src/rabbit_channel.erl
+++ b/deps/rabbit/src/rabbit_channel.erl
@@ -484,6 +484,8 @@ update_user_state(Pid, UserState) when is_pid(Pid) ->
 init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
       Capabilities, CollectorPid, LimiterPid, AmqpParams]) ->
     process_flag(trap_exit, true),
+    rabbit_process_flag:adjust_for_message_handling_proc(),
+
     ?LG_PROCESS_TYPE(channel),
     ?store_proc_name({ConnName, Channel}),
     ok = pg_local:join(rabbit_channels, self()),
@@ -2785,3 +2787,16 @@ maybe_decrease_global_publishers(#ch{publishing_mode = true}) ->
 
 is_global_qos_permitted() ->
     rabbit_deprecated_features:is_permitted(global_qos).
+
+adjust_vheap() ->
+    case code_version:get_otp_version() of
+        OtpMaj when OtpMaj >= 27 ->
+            %% 46422 is the default min_bin_vheap_size and for OTP 27 and above
+            %% we want to substantially increase it for processes that may buffer
+            %% messages. 32x has proven workable in testing whilst not being
+            %% rediculously large
+            process_flag(min_bin_vheap_size, 46422 * 32);
+        _ ->
+            ok
+    end.
+
diff --git a/deps/rabbit/src/rabbit_process_flag.erl b/deps/rabbit/src/rabbit_process_flag.erl
new file mode 100644
index 000000000000..32c8f1562579
--- /dev/null
+++ b/deps/rabbit/src/rabbit_process_flag.erl
@@ -0,0 +1,32 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
+%%
+
+-module(rabbit_process_flag).
+
+
+-export([adjust_for_message_handling_proc/0
+        ]).
+
+%% @doc Enqueues a message.
+%% Adjust process flags for processes that handle RabbitMQ messages.
+%% For example any process that uses the `rabbit_queue_type' module
+%% may benefit from this tuning.
+%% @returns `ok'
+-spec adjust_for_message_handling_proc() -> ok.
+adjust_for_message_handling_proc() ->
+    process_flag(message_queue_data, off_heap),
+    case code_version:get_otp_version() of
+        OtpMaj when OtpMaj >= 27 ->
+            %% 46422 is the default min_bin_vheap_size and for OTP 27 and above
+            %% we want to substantially increase it for processes that may buffer
+            %% messages. 32x has proven workable in testing whilst not being
+            %% rediculously large
+            process_flag(min_bin_vheap_size, 46422 * 32),
+            ok;
+        _ ->
+            ok
+    end.
diff --git a/deps/rabbit/src/rabbit_ra_systems.erl b/deps/rabbit/src/rabbit_ra_systems.erl
index 08e15ecb53ba..033c76132522 100644
--- a/deps/rabbit/src/rabbit_ra_systems.erl
+++ b/deps/rabbit/src/rabbit_ra_systems.erl
@@ -24,6 +24,9 @@
 -define(COORD_WAL_MAX_SIZE_B, 64_000_000).
 -define(QUORUM_AER_MAX_RPC_SIZE, 16).
 -define(QUORUM_DEFAULT_WAL_MAX_ENTRIES, 500_000).
+%% the default min bin vheap value in OTP 26
+-define(MIN_BIN_VHEAP_SIZE_DEFAULT, 46422).
+-define(MIN_BIN_VHEAP_SIZE_MULT, 64).
 
 -spec setup() -> ok | no_return().
 
@@ -107,7 +110,6 @@ ensure_ra_system_started(RaSystem) ->
     end.
 
 -spec get_config(ra_system_name()) -> ra_system:config().
-
 get_config(quorum_queues = RaSystem) ->
     DefaultConfig = get_default_config(),
     Checksums = application:get_env(rabbit, quorum_compute_checksums, true),
@@ -124,7 +126,16 @@ get_config(quorum_queues = RaSystem) ->
     AERBatchSize = application:get_env(rabbit, quorum_max_append_entries_rpc_batch_size,
                                        ?QUORUM_AER_MAX_RPC_SIZE),
     CompressMemTables = application:get_env(rabbit, quorum_compress_mem_tables, true),
+    MinBinVheapSize = case code_version:get_otp_version() of
+                          OtpMaj when OtpMaj >= 27 ->
+                              ?MIN_BIN_VHEAP_SIZE_DEFAULT * ?MIN_BIN_VHEAP_SIZE_MULT;
+                          _ ->
+                              ?MIN_BIN_VHEAP_SIZE_DEFAULT
+                      end,
+
     DefaultConfig#{name => RaSystem,
+                   wal_min_bin_vheap_size => MinBinVheapSize,
+                   server_min_bin_vheap_size => MinBinVheapSize,
                    default_max_append_entries_rpc_batch_size => AERBatchSize,
                    wal_compute_checksums => WalChecksums,
                    wal_max_entries => WalMaxEntries,
diff --git a/deps/rabbit_common/src/code_version.erl b/deps/rabbit_common/src/code_version.erl
index 568a6e7c439a..af90f73d941f 100644
--- a/deps/rabbit_common/src/code_version.erl
+++ b/deps/rabbit_common/src/code_version.erl
@@ -116,6 +116,7 @@ get_forms(Code) ->
             throw({no_abstract_code, Reason})
     end.
 
+-spec get_otp_version() -> non_neg_integer().
 get_otp_version() ->
     Version = erlang:system_info(otp_release),
     case re:run(Version, "^[0-9][0-9]", [{capture, first, list}]) of
diff --git a/moduleindex.yaml b/moduleindex.yaml
index cbcc44019c66..969c58a7ace3 100755
--- a/moduleindex.yaml
+++ b/moduleindex.yaml
@@ -696,6 +696,7 @@ rabbit:
 - rabbit_prelaunch_logging
 - rabbit_priority_queue
 - rabbit_process
+- rabbit_process_flag
 - rabbit_queue_consumers
 - rabbit_queue_decorator
 - rabbit_queue_index

From 6a7f8d0d1e9280c1ea9c210b50423785d86a0ba2 Mon Sep 17 00:00:00 2001
From: Michal Kuratczyk <mkuratczyk@vmware.com>
Date: Tue, 10 Sep 2024 11:09:31 +0200
Subject: [PATCH 2/2] Remove redundant copy of
 adjust_for_message_handling_proc/0

---
 deps/rabbit/src/rabbit_channel.erl      | 13 -------------
 deps/rabbit/src/rabbit_process_flag.erl |  3 +--
 2 files changed, 1 insertion(+), 15 deletions(-)

diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl
index f6d3657147f8..71fa9be6f305 100644
--- a/deps/rabbit/src/rabbit_channel.erl
+++ b/deps/rabbit/src/rabbit_channel.erl
@@ -2787,16 +2787,3 @@ maybe_decrease_global_publishers(#ch{publishing_mode = true}) ->
 
 is_global_qos_permitted() ->
     rabbit_deprecated_features:is_permitted(global_qos).
-
-adjust_vheap() ->
-    case code_version:get_otp_version() of
-        OtpMaj when OtpMaj >= 27 ->
-            %% 46422 is the default min_bin_vheap_size and for OTP 27 and above
-            %% we want to substantially increase it for processes that may buffer
-            %% messages. 32x has proven workable in testing whilst not being
-            %% rediculously large
-            process_flag(min_bin_vheap_size, 46422 * 32);
-        _ ->
-            ok
-    end.
-
diff --git a/deps/rabbit/src/rabbit_process_flag.erl b/deps/rabbit/src/rabbit_process_flag.erl
index 32c8f1562579..fc74c25f554e 100644
--- a/deps/rabbit/src/rabbit_process_flag.erl
+++ b/deps/rabbit/src/rabbit_process_flag.erl
@@ -11,7 +11,6 @@
 -export([adjust_for_message_handling_proc/0
         ]).
 
-%% @doc Enqueues a message.
 %% Adjust process flags for processes that handle RabbitMQ messages.
 %% For example any process that uses the `rabbit_queue_type' module
 %% may benefit from this tuning.
@@ -24,7 +23,7 @@ adjust_for_message_handling_proc() ->
             %% 46422 is the default min_bin_vheap_size and for OTP 27 and above
             %% we want to substantially increase it for processes that may buffer
             %% messages. 32x has proven workable in testing whilst not being
-            %% rediculously large
+            %% ridiculously large
             process_flag(min_bin_vheap_size, 46422 * 32),
             ok;
         _ ->