Skip to content

Commit

Permalink
Merge pull request #11857 from rabbitmq/classic_queue_consumer_unsent…
Browse files Browse the repository at this point in the history
…_message_limit

Make classic_queue_consumer_unsent_message_limit configurable
  • Loading branch information
michaelklishin authored Jul 30, 2024
2 parents 777fd6f + c771b24 commit 03885fa
Showing 1 changed file with 28 additions and 20 deletions.
48 changes: 28 additions & 20 deletions deps/rabbit/src/rabbit_queue_consumers.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@

-define(QUEUE, lqueue).

-define(UNSENT_MESSAGE_LIMIT, 200).
-define(KEY_UNSENT_MESSAGE_LIMIT, classic_queue_consumer_unsent_message_limit).
-define(DEFAULT_UNSENT_MESSAGE_LIMIT, 200).

%% Utilisation average calculations are all in μs.
-define(USE_AVG_HALF_LIFE, 1000000.0).
Expand Down Expand Up @@ -72,10 +73,15 @@

-spec new() -> state().

new() -> #state{consumers = priority_queue:new(),
use = {active,
erlang:monotonic_time(micro_seconds),
1.0}}.
new() ->
Val = application:get_env(rabbit,
?KEY_UNSENT_MESSAGE_LIMIT,
?DEFAULT_UNSENT_MESSAGE_LIMIT),
persistent_term:put(?KEY_UNSENT_MESSAGE_LIMIT, Val),
#state{consumers = priority_queue:new(),
use = {active,
erlang:monotonic_time(microsecond),
1.0}}.

-spec max_active_priority(state()) -> integer() | 'infinity' | 'empty'.

Expand Down Expand Up @@ -286,7 +292,6 @@ deliver_to_consumer(FetchFun,
E = {ChPid, Consumer = #consumer{tag = CTag}},
QName) ->
C = #cr{link_states = LinkStates} = lookup_ch(ChPid),
ChBlocked = is_ch_blocked(C),
case LinkStates of
#{CTag := #link_state{delivery_count = DeliveryCount0,
credit = Credit} = LinkState0} ->
Expand All @@ -308,22 +313,24 @@ deliver_to_consumer(FetchFun,
block_consumer(C, E),
undelivered
end;
_ when ChBlocked ->
%% not a link credit consumer, use credit flow
block_consumer(C, E),
undelivered;
_ ->
%% not a link credit consumer, use credit flow
case rabbit_limiter:can_send(C#cr.limiter,
Consumer#consumer.ack_required,
CTag) of
{suspend, Limiter} ->
block_consumer(C#cr{limiter = Limiter}, E),
case is_ch_blocked(C) of
true ->
block_consumer(C, E),
undelivered;
{continue, Limiter} ->
{delivered, deliver_to_consumer(
FetchFun, Consumer,
C#cr{limiter = Limiter}, QName)}
false ->
case rabbit_limiter:can_send(C#cr.limiter,
Consumer#consumer.ack_required,
CTag) of
{suspend, Limiter} ->
block_consumer(C#cr{limiter = Limiter}, E),
undelivered;
{continue, Limiter} ->
{delivered, deliver_to_consumer(
FetchFun, Consumer,
C#cr{limiter = Limiter}, QName)}
end
end
end.

Expand Down Expand Up @@ -653,7 +660,8 @@ block_consumer(C = #cr{blocked_consumers = Blocked}, QEntry) ->
update_ch_record(C#cr{blocked_consumers = add_consumer(QEntry, Blocked)}).

is_ch_blocked(#cr{unsent_message_count = Count, limiter = Limiter}) ->
Count >= ?UNSENT_MESSAGE_LIMIT orelse rabbit_limiter:is_suspended(Limiter).
UnsentMessageLimit = persistent_term:get(?KEY_UNSENT_MESSAGE_LIMIT),
Count >= UnsentMessageLimit orelse rabbit_limiter:is_suspended(Limiter).

tags(CList) -> [CTag || {_P, {_ChPid, #consumer{tag = CTag}}} <- CList].

Expand Down

0 comments on commit 03885fa

Please sign in to comment.