Skip to content

Commit

Permalink
Qq: adjust checkpointing algo to something more like
Browse files Browse the repository at this point in the history
it was in 3.13.x
  • Loading branch information
kjnilsson committed Aug 9, 2024
1 parent aeedad7 commit b17f444
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 85 deletions.
131 changes: 61 additions & 70 deletions deps/rabbit/src/rabbit_fifo.erl
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,6 @@ init(#{name := Name,
update_config(Conf, State) ->
DLH = maps:get(dead_letter_handler, Conf, undefined),
BLH = maps:get(become_leader_handler, Conf, undefined),
RCI = maps:get(release_cursor_interval, Conf, ?RELEASE_CURSOR_EVERY),
Overflow = maps:get(overflow_strategy, Conf, drop_head),
MaxLength = maps:get(max_length, Conf, undefined),
MaxBytes = maps:get(max_bytes, Conf, undefined),
Expand All @@ -206,11 +205,9 @@ update_config(Conf, State) ->
competing
end,
Cfg = State#?STATE.cfg,
RCISpec = {RCI, RCI},

LastActive = maps:get(created, Conf, undefined),
State#?STATE{cfg = Cfg#cfg{release_cursor_interval = RCISpec,
dead_letter_handler = DLH,
State#?STATE{cfg = Cfg#cfg{dead_letter_handler = DLH,
become_leader_handler = BLH,
overflow_strategy = Overflow,
max_length = MaxLength,
Expand Down Expand Up @@ -580,9 +577,8 @@ apply(#{system_time := Ts} = Meta,
Effects = [{monitor, node, Node} | Effects1],
checkout(Meta, State0, State#?STATE{enqueuers = Enqs,
last_active = Ts}, Effects);
apply(#{index := _Idx} = Meta, {down, Pid, _Info}, State0) ->
{State1, Effects1} = activate_next_consumer(
handle_down(Meta, Pid, State0)),
apply(Meta, {down, Pid, _Info}, State0) ->
{State1, Effects1} = activate_next_consumer(handle_down(Meta, Pid, State0)),
checkout(Meta, State0, State1, Effects1);
apply(Meta, {nodeup, Node}, #?STATE{consumers = Cons0,
enqueuers = Enqs0,
Expand Down Expand Up @@ -670,7 +666,8 @@ convert_v3_to_v4(#{} = _Meta, StateV3) ->
end, Returns0)),

Messages = rabbit_fifo_q:from_lqueue(Messages0),
#?STATE{cfg = rabbit_fifo_v3:get_field(cfg, StateV3),
Cfg = rabbit_fifo_v3:get_field(cfg, StateV3),
#?STATE{cfg = Cfg#cfg{unused_1 = ?NIL},
messages = Messages,
messages_total = rabbit_fifo_v3:get_field(messages_total, StateV3),
returns = Returns,
Expand Down Expand Up @@ -813,8 +810,7 @@ state_enter0(_, _, Effects) ->
Effects.

-spec tick(non_neg_integer(), state()) -> ra_machine:effects().
tick(Ts, #?STATE{cfg = #cfg{name = _Name,
resource = QName}} = State) ->
tick(Ts, #?STATE{cfg = #cfg{resource = QName}} = State) ->
case is_expired(Ts, State) of
true ->
[{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}];
Expand All @@ -835,7 +831,6 @@ overview(#?STATE{consumers = Cons,
waiting_consumers = WaitingConsumers} = State) ->
Conf = #{name => Cfg#cfg.name,
resource => Cfg#cfg.resource,
release_cursor_interval => Cfg#cfg.release_cursor_interval,
dead_lettering_enabled => undefined =/= Cfg#cfg.dead_letter_handler,
max_length => Cfg#cfg.max_length,
max_bytes => Cfg#cfg.max_bytes,
Expand Down Expand Up @@ -908,9 +903,10 @@ which_module(4) -> ?MODULE.

-record(checkpoint, {index :: ra:index(),
timestamp :: milliseconds(),
enqueue_count :: non_neg_integer(),
smallest_index :: undefined | ra:index(),
messages_total :: non_neg_integer()}).
messages_total :: non_neg_integer(),
indexes = ?CHECK_MIN_INDEXES :: non_neg_integer(),
unused_1 = ?NIL}).
-record(aux_gc, {last_raft_idx = 0 :: ra:index()}).
-record(aux, {name :: atom(),
capacity :: term(),
Expand All @@ -934,8 +930,8 @@ init_aux(Name) when is_atom(Name) ->
capacity = {inactive, Now, 1, 1.0},
last_checkpoint = #checkpoint{index = 0,
timestamp = erlang:system_time(millisecond),
enqueue_count = 0,
messages_total = 0}}.
messages_total = 0,
unused_1 = ?NIL}}.

handle_aux(RaftState, Tag, Cmd, #aux{name = Name,
capacity = Cap,
Expand All @@ -959,18 +955,18 @@ handle_aux(_RaftState, cast, {#return{msg_ids = MsgIds,
case find_consumer(Key, Consumers) of
{ConsumerKey, #consumer{checked_out = Checked}} ->
{RaAux, ToReturn} =
maps:fold(
fun (MsgId, ?MSG(Idx, Header), {RA0, Acc}) ->
%% it is possible this is not found if the consumer
%% crashed and the message got removed
case ra_aux:log_fetch(Idx, RA0) of
{{_Term, _Meta, Cmd}, RA} ->
Msg = get_msg(Cmd),
{RA, [{MsgId, Idx, Header, Msg} | Acc]};
{undefined, RA} ->
{RA, Acc}
end
end, {RaAux0, []}, maps:with(MsgIds, Checked)),
maps:fold(
fun (MsgId, ?MSG(Idx, Header), {RA0, Acc}) ->
%% it is possible this is not found if the consumer
%% crashed and the message got removed
case ra_aux:log_fetch(Idx, RA0) of
{{_Term, _Meta, Cmd}, RA} ->
Msg = get_msg(Cmd),
{RA, [{MsgId, Idx, Header, Msg} | Acc]};
{undefined, RA} ->
{RA, Acc}
end
end, {RaAux0, []}, maps:with(MsgIds, Checked)),

Appends = make_requeue(ConsumerKey, {notify, Corr, Pid},
lists:sort(ToReturn), []),
Expand Down Expand Up @@ -2639,8 +2635,8 @@ suspected_pids_for(Node, #?STATE{consumers = Cons0,
end, Enqs, WaitingConsumers0).

is_expired(Ts, #?STATE{cfg = #cfg{expires = Expires},
last_active = LastActive,
consumers = Consumers})
last_active = LastActive,
consumers = Consumers})
when is_number(LastActive) andalso is_number(Expires) ->
%% TODO: should it be active consumers?
Active = maps:filter(fun (_, #consumer{status = suspected_down}) ->
Expand Down Expand Up @@ -2845,53 +2841,48 @@ priority_tag(Msg) ->
lo
end.

-define(CHECK_ENQ_MIN_INTERVAL_MS, 500).
-define(CHECK_ENQ_MIN_INDEXES, 4096).
-define(CHECK_MIN_INTERVAL_MS, 5000).
-define(CHECK_MIN_INDEXES, 65456).

do_checkpoints(Ts,
#checkpoint{index = ChIdx,
timestamp = ChTime,
enqueue_count = ChEnqCnt,
smallest_index = LastSmallest,
messages_total = LastMsgsTot} = Check0, RaAux) ->
indexes = Interval} = Check0, RaAux) ->
LastAppliedIdx = ra_aux:last_applied(RaAux),
#?STATE{enqueue_count = EnqCnt} = MacState = ra_aux:machine_state(RaAux),
MsgsTot = messages_total(MacState),
Mult = case MsgsTot > 200_000 of
true ->
min(4, MsgsTot div 100_000);
false ->
1
end,
Since = Ts - ChTime,
NewSmallest = case smallest_raft_index(MacState) of
undefined ->
LastAppliedIdx;
Smallest ->
Smallest
end,
{Check, Effects} = case (EnqCnt - ChEnqCnt > ?CHECK_ENQ_MIN_INDEXES andalso
Since > (?CHECK_ENQ_MIN_INTERVAL_MS * Mult)) orelse
(LastAppliedIdx - ChIdx > ?CHECK_MIN_INDEXES andalso
Since > (?CHECK_MIN_INTERVAL_MS * Mult)) orelse
(LastMsgsTot > 0 andalso MsgsTot == 0) of
true ->
%% take a checkpoint;
{#checkpoint{index = LastAppliedIdx,
timestamp = Ts,
enqueue_count = EnqCnt,
smallest_index = NewSmallest,
messages_total = MsgsTot},
[{checkpoint, LastAppliedIdx, MacState} |
release_cursor(LastSmallest, NewSmallest)]};
false ->
{Check0#checkpoint{smallest_index = NewSmallest},
release_cursor(LastSmallest, NewSmallest)}
end,

{Check, Effects}.
IndexesSince = LastAppliedIdx - ChIdx,
#?STATE{} = MacState = ra_aux:machine_state(RaAux),
TimeSince = Ts - ChTime,
NewSmallest = case smallest_raft_index(MacState) of
undefined ->
LastAppliedIdx;
Smallest ->
Smallest
end,
MsgsTot = messages_ready(MacState),
{CheckMinInterval, CheckMinIndexes, CheckMaxIndexes} =
persistent_term:get(quorum_queue_checkpoint_config,
{?CHECK_MIN_INTERVAL_MS, ?CHECK_MIN_INDEXES,
?CHECK_MAX_INDEXES}),

case (IndexesSince > Interval andalso
TimeSince > CheckMinInterval) orelse
%% the queue is empty and some commands have been
%% applied since the last snapshot
(IndexesSince > CheckMinIndexes andalso MsgsTot == 0) of
true ->
%% take fewer checkpoints the more messages there are on queue
NextIndexes = min(max(MsgsTot, CheckMinIndexes), CheckMaxIndexes),
%% take a checkpoint;
{#checkpoint{index = LastAppliedIdx,
timestamp = Ts,
smallest_index = NewSmallest,
messages_total = MsgsTot,
indexes = NextIndexes},
[{checkpoint, LastAppliedIdx, MacState} |
release_cursor(LastSmallest, NewSmallest)]};
false ->
{Check0#checkpoint{smallest_index = NewSmallest},
release_cursor(LastSmallest, NewSmallest)}
end.

release_cursor(LastSmallest, Smallest)
when is_integer(LastSmallest) andalso
Expand Down
27 changes: 16 additions & 11 deletions deps/rabbit/src/rabbit_fifo.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
%% Raw message data is always stored on disk.
-define(MSG(Index, Header), ?TUPLE(Index, Header)).

-define(NIL, []).

-define(IS_HEADER(H),
(is_integer(H) andalso H >= 0) orelse
is_list(H) orelse
Expand Down Expand Up @@ -97,8 +99,10 @@
-type applied_mfa() :: {module(), atom(), list()}.
% represents a partially applied module call

-define(RELEASE_CURSOR_EVERY, 2048 * 4).
-define(RELEASE_CURSOR_EVERY_MAX, 1_000_000).
-define(CHECK_MIN_INTERVAL_MS, 666).
-define(CHECK_MIN_INDEXES, 4096).
-define(CHECK_MAX_INDEXES, 1_000_000).

-define(USE_AVG_HALF_LIFE, 10000.0).
%% an average QQ without any message uses about 100KB so setting this limit
%% to ~10 times that should be relatively safe.
Expand Down Expand Up @@ -143,20 +147,20 @@
-record(enqueuer,
{next_seqno = 1 :: msg_seqno(),
% out of order enqueues - sorted list
unused,
unused = ?NIL,
status = up :: up | suspected_down,
%% it is useful to have a record of when this was blocked
%% so that we can retry sending the block effect if
%% the publisher did not receive the initial one
blocked :: option(ra:index()),
unused_1,
unused_2
unused_1 = ?NIL,
unused_2 = ?NIL
}).

-record(cfg,
{name :: atom(),
resource :: rabbit_types:r('queue'),
release_cursor_interval :: option({non_neg_integer(), non_neg_integer()}),
unused_1 = ?NIL,
dead_letter_handler :: dead_letter_handler(),
become_leader_handler :: option(applied_mfa()),
overflow_strategy = drop_head :: drop_head | reject_publish,
Expand All @@ -168,8 +172,8 @@
delivery_limit :: option(non_neg_integer()),
expires :: option(milliseconds()),
msg_ttl :: option(milliseconds()),
unused_1,
unused_2
unused_2 = ?NIL,
unused_3 = ?NIL
}).

-record(rabbit_fifo,
Expand All @@ -191,7 +195,7 @@
% index when there are large gaps but should be faster than gb_trees
% for normal appending operations as it's backed by a map
ra_indexes = rabbit_fifo_index:empty() :: rabbit_fifo_index:state(),
unused_1,
unused_1 = ?NIL,
% consumers need to reflect consumer state at time of snapshot
consumers = #{} :: #{consumer_key() => consumer()},
% consumers that require further service are queued here
Expand All @@ -205,14 +209,15 @@
waiting_consumers = [] :: [{consumer_key(), consumer()}],
last_active :: option(non_neg_integer()),
msg_cache :: option({ra:index(), raw_msg()}),
unused_2
unused_2 = ?NIL
}).

-type config() :: #{name := atom(),
queue_resource := rabbit_types:r('queue'),
dead_letter_handler => dead_letter_handler(),
become_leader_handler => applied_mfa(),
release_cursor_interval => non_neg_integer(),
checkpoint_min_indexes => non_neg_integer(),
checkpoint_max_indexes => non_neg_integer(),
max_length => non_neg_integer(),
max_bytes => non_neg_integer(),
max_in_memory_length => non_neg_integer(),
Expand Down
4 changes: 0 additions & 4 deletions deps/rabbit/src/rabbit_quorum_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -315,8 +315,6 @@ ra_machine_config(Q) when ?is_amqqueue(Q) ->
OverflowBin = args_policy_lookup(<<"overflow">>, fun policyHasPrecedence/2, Q),
Overflow = overflow(OverflowBin, drop_head, QName),
MaxBytes = args_policy_lookup(<<"max-length-bytes">>, fun min/2, Q),
MaxMemoryLength = args_policy_lookup(<<"max-in-memory-length">>, fun min/2, Q),
MaxMemoryBytes = args_policy_lookup(<<"max-in-memory-bytes">>, fun min/2, Q),
DeliveryLimit = args_policy_lookup(<<"delivery-limit">>, fun min/2, Q),
Expires = args_policy_lookup(<<"expires">>, fun min/2, Q),
MsgTTL = args_policy_lookup(<<"message-ttl">>, fun min/2, Q),
Expand All @@ -326,8 +324,6 @@ ra_machine_config(Q) when ?is_amqqueue(Q) ->
become_leader_handler => {?MODULE, become_leader, [QName]},
max_length => MaxLength,
max_bytes => MaxBytes,
max_in_memory_length => MaxMemoryLength,
max_in_memory_bytes => MaxMemoryBytes,
single_active_consumer_on => single_active_consumer_on(Q),
delivery_limit => DeliveryLimit,
overflow_strategy => Overflow,
Expand Down
37 changes: 37 additions & 0 deletions deps/rabbit/test/rabbit_fifo_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2737,7 +2737,44 @@ modify_test(Config) ->

ok.

ttb_test(Config) ->
S0 = init(#{name => ?FUNCTION_NAME,
queue_resource =>
rabbit_misc:r("/", queue, ?FUNCTION_NAME_B)}),


S1 = do_n(5_000_000,
fun (N, Acc) ->
I = (5_000_000 - N),
element(1, enq(Config, I, I, ?FUNCTION_NAME_B, Acc))
end, S0),



{T1, _Res} = timer:tc(fun () ->
do_n(100, fun (_, S) ->
term_to_binary(S),
S1 end, S1)
end),
ct:pal("T1 took ~bus", [T1]),


{T2, _} = timer:tc(fun () ->
do_n(100, fun (_, S) -> term_to_iovec(S), S1 end, S1)
end),
ct:pal("T2 took ~bus", [T2]),

ok.

%% Utility
%%

do_n(0, _, A) ->
A;
do_n(N, Fun, A0) ->
A = Fun(N, A0),
do_n(N-1, Fun, A).


init(Conf) -> rabbit_fifo:init(Conf).
make_register_enqueuer(Pid) -> rabbit_fifo:make_register_enqueuer(Pid).
Expand Down

0 comments on commit b17f444

Please sign in to comment.