From c771b2422a2cbb05541705dfc5cdb66a6d3e4f0b Mon Sep 17 00:00:00 2001 From: David Ansari Date: Mon, 29 Jul 2024 22:48:48 +0200 Subject: [PATCH] Make classic_queue_consumer_unsent_message_limit configurable Similar to other RabbitMQ internal credit flow configurations such as `credit_flow_default_credit` and `msg_store_credit_disc_bound`, this commit makes the `classic_queue_consumer_unsent_message_limit` configurable via `advanced.config`. See https://github.com/rabbitmq/rabbitmq-server/pull/11822 for the original motivation to make this setting configurable. --- deps/rabbit/src/rabbit_queue_consumers.erl | 48 +++++++++++++--------- 1 file changed, 28 insertions(+), 20 deletions(-) diff --git a/deps/rabbit/src/rabbit_queue_consumers.erl b/deps/rabbit/src/rabbit_queue_consumers.erl index 62ae7bd20c20..7a95582a6551 100644 --- a/deps/rabbit/src/rabbit_queue_consumers.erl +++ b/deps/rabbit/src/rabbit_queue_consumers.erl @@ -22,7 +22,8 @@ -define(QUEUE, lqueue). --define(UNSENT_MESSAGE_LIMIT, 200). +-define(KEY_UNSENT_MESSAGE_LIMIT, classic_queue_consumer_unsent_message_limit). +-define(DEFAULT_UNSENT_MESSAGE_LIMIT, 200). %% Utilisation average calculations are all in μs. -define(USE_AVG_HALF_LIFE, 1000000.0). @@ -72,10 +73,15 @@ -spec new() -> state(). -new() -> #state{consumers = priority_queue:new(), - use = {active, - erlang:monotonic_time(micro_seconds), - 1.0}}. +new() -> + Val = application:get_env(rabbit, + ?KEY_UNSENT_MESSAGE_LIMIT, + ?DEFAULT_UNSENT_MESSAGE_LIMIT), + persistent_term:put(?KEY_UNSENT_MESSAGE_LIMIT, Val), + #state{consumers = priority_queue:new(), + use = {active, + erlang:monotonic_time(microsecond), + 1.0}}. -spec max_active_priority(state()) -> integer() | 'infinity' | 'empty'. @@ -286,7 +292,6 @@ deliver_to_consumer(FetchFun, E = {ChPid, Consumer = #consumer{tag = CTag}}, QName) -> C = #cr{link_states = LinkStates} = lookup_ch(ChPid), - ChBlocked = is_ch_blocked(C), case LinkStates of #{CTag := #link_state{delivery_count = DeliveryCount0, credit = Credit} = LinkState0} -> @@ -308,22 +313,24 @@ deliver_to_consumer(FetchFun, block_consumer(C, E), undelivered end; - _ when ChBlocked -> - %% not a link credit consumer, use credit flow - block_consumer(C, E), - undelivered; _ -> %% not a link credit consumer, use credit flow - case rabbit_limiter:can_send(C#cr.limiter, - Consumer#consumer.ack_required, - CTag) of - {suspend, Limiter} -> - block_consumer(C#cr{limiter = Limiter}, E), + case is_ch_blocked(C) of + true -> + block_consumer(C, E), undelivered; - {continue, Limiter} -> - {delivered, deliver_to_consumer( - FetchFun, Consumer, - C#cr{limiter = Limiter}, QName)} + false -> + case rabbit_limiter:can_send(C#cr.limiter, + Consumer#consumer.ack_required, + CTag) of + {suspend, Limiter} -> + block_consumer(C#cr{limiter = Limiter}, E), + undelivered; + {continue, Limiter} -> + {delivered, deliver_to_consumer( + FetchFun, Consumer, + C#cr{limiter = Limiter}, QName)} + end end end. @@ -653,7 +660,8 @@ block_consumer(C = #cr{blocked_consumers = Blocked}, QEntry) -> update_ch_record(C#cr{blocked_consumers = add_consumer(QEntry, Blocked)}). is_ch_blocked(#cr{unsent_message_count = Count, limiter = Limiter}) -> - Count >= ?UNSENT_MESSAGE_LIMIT orelse rabbit_limiter:is_suspended(Limiter). + UnsentMessageLimit = persistent_term:get(?KEY_UNSENT_MESSAGE_LIMIT), + Count >= UnsentMessageLimit orelse rabbit_limiter:is_suspended(Limiter). tags(CList) -> [CTag || {_P, {_ChPid, #consumer{tag = CTag}}} <- CList].