Skip to content

Commit

Permalink
Merge pull request #12023 from rabbitmq/mergify/bp/v4.0.x/pr-11964
Browse files Browse the repository at this point in the history
QQ: checkpointing frequency improvements (backport #11964)
  • Loading branch information
michaelklishin authored Aug 16, 2024
2 parents 4d003f9 + 212afbc commit c6aaa50
Show file tree
Hide file tree
Showing 7 changed files with 165 additions and 118 deletions.
1 change: 1 addition & 0 deletions deps/rabbit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,7 @@ rabbitmq_suite(
"@gen_batch_server//:erlang_app",
"@meck//:erlang_app",
"@ra//:erlang_app",
"//deps/rabbitmq_ct_helpers:erlang_app",
],
)

Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/app.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -1332,7 +1332,7 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
outs = ["test/rabbit_fifo_int_SUITE.beam"],
app_name = "rabbit",
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/rabbit_common:erlang_app"],
deps = ["//deps/rabbit_common:erlang_app", "//deps/rabbitmq_ct_helpers:erlang_app"],
)
erlang_bytecode(
name = "rabbit_fifo_prop_SUITE_beam_files",
Expand Down
200 changes: 100 additions & 100 deletions deps/rabbit/src/rabbit_fifo.erl
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,6 @@ init(#{name := Name,
update_config(Conf, State) ->
DLH = maps:get(dead_letter_handler, Conf, undefined),
BLH = maps:get(become_leader_handler, Conf, undefined),
RCI = maps:get(release_cursor_interval, Conf, ?RELEASE_CURSOR_EVERY),
Overflow = maps:get(overflow_strategy, Conf, drop_head),
MaxLength = maps:get(max_length, Conf, undefined),
MaxBytes = maps:get(max_bytes, Conf, undefined),
Expand All @@ -206,11 +205,9 @@ update_config(Conf, State) ->
competing
end,
Cfg = State#?STATE.cfg,
RCISpec = {RCI, RCI},

LastActive = maps:get(created, Conf, undefined),
State#?STATE{cfg = Cfg#cfg{release_cursor_interval = RCISpec,
dead_letter_handler = DLH,
State#?STATE{cfg = Cfg#cfg{dead_letter_handler = DLH,
become_leader_handler = BLH,
overflow_strategy = Overflow,
max_length = MaxLength,
Expand Down Expand Up @@ -485,7 +482,7 @@ apply(#{index := Index}, #purge{},
returns = lqueue:new(),
msg_bytes_enqueue = 0
},
Effects0 = [garbage_collection],
Effects0 = [{aux, force_checkpoint}, garbage_collection],
Reply = {purge, NumReady},
{State, _, Effects} = evaluate_limit(Index, false, State0,
State1, Effects0),
Expand Down Expand Up @@ -580,9 +577,8 @@ apply(#{system_time := Ts} = Meta,
Effects = [{monitor, node, Node} | Effects1],
checkout(Meta, State0, State#?STATE{enqueuers = Enqs,
last_active = Ts}, Effects);
apply(#{index := _Idx} = Meta, {down, Pid, _Info}, State0) ->
{State1, Effects1} = activate_next_consumer(
handle_down(Meta, Pid, State0)),
apply(Meta, {down, Pid, _Info}, State0) ->
{State1, Effects1} = activate_next_consumer(handle_down(Meta, Pid, State0)),
checkout(Meta, State0, State1, Effects1);
apply(Meta, {nodeup, Node}, #?STATE{consumers = Cons0,
enqueuers = Enqs0,
Expand Down Expand Up @@ -670,7 +666,8 @@ convert_v3_to_v4(#{} = _Meta, StateV3) ->
end, Returns0)),

Messages = rabbit_fifo_q:from_lqueue(Messages0),
#?STATE{cfg = rabbit_fifo_v3:get_field(cfg, StateV3),
Cfg = rabbit_fifo_v3:get_field(cfg, StateV3),
#?STATE{cfg = Cfg#cfg{unused_1 = ?NIL},
messages = Messages,
messages_total = rabbit_fifo_v3:get_field(messages_total, StateV3),
returns = Returns,
Expand Down Expand Up @@ -813,8 +810,7 @@ state_enter0(_, _, Effects) ->
Effects.

-spec tick(non_neg_integer(), state()) -> ra_machine:effects().
tick(Ts, #?STATE{cfg = #cfg{name = _Name,
resource = QName}} = State) ->
tick(Ts, #?STATE{cfg = #cfg{resource = QName}} = State) ->
case is_expired(Ts, State) of
true ->
[{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}];
Expand All @@ -835,7 +831,6 @@ overview(#?STATE{consumers = Cons,
waiting_consumers = WaitingConsumers} = State) ->
Conf = #{name => Cfg#cfg.name,
resource => Cfg#cfg.resource,
release_cursor_interval => Cfg#cfg.release_cursor_interval,
dead_lettering_enabled => undefined =/= Cfg#cfg.dead_letter_handler,
max_length => Cfg#cfg.max_length,
max_bytes => Cfg#cfg.max_bytes,
Expand Down Expand Up @@ -908,9 +903,10 @@ which_module(4) -> ?MODULE.

-record(checkpoint, {index :: ra:index(),
timestamp :: milliseconds(),
enqueue_count :: non_neg_integer(),
smallest_index :: undefined | ra:index(),
messages_total :: non_neg_integer()}).
messages_total :: non_neg_integer(),
indexes = ?CHECK_MIN_INDEXES :: non_neg_integer(),
unused_1 = ?NIL}).
-record(aux_gc, {last_raft_idx = 0 :: ra:index()}).
-record(aux, {name :: atom(),
capacity :: term(),
Expand All @@ -921,7 +917,6 @@ which_module(4) -> ?MODULE.
gc = #aux_gc{} :: #aux_gc{},
tick_pid :: undefined | pid(),
cache = #{} :: map(),
%% TODO: we need a state conversion for this
last_checkpoint :: #checkpoint{}}).

init_aux(Name) when is_atom(Name) ->
Expand All @@ -934,8 +929,8 @@ init_aux(Name) when is_atom(Name) ->
capacity = {inactive, Now, 1, 1.0},
last_checkpoint = #checkpoint{index = 0,
timestamp = erlang:system_time(millisecond),
enqueue_count = 0,
messages_total = 0}}.
messages_total = 0,
unused_1 = ?NIL}}.

handle_aux(RaftState, Tag, Cmd, #aux{name = Name,
capacity = Cap,
Expand All @@ -950,6 +945,35 @@ handle_aux(RaftState, Tag, Cmd, AuxV2, RaAux)
Name = element(2, AuxV2),
AuxV3 = init_aux(Name),
handle_aux(RaftState, Tag, Cmd, AuxV3, RaAux);
handle_aux(leader, cast, eval,
#?AUX{last_decorators_state = LastDec,
last_checkpoint = Check0} = Aux0,
RaAux) ->
#?STATE{cfg = #cfg{resource = QName}} = MacState =
ra_aux:machine_state(RaAux),

Ts = erlang:system_time(millisecond),
{Check, Effects0} = do_checkpoints(Ts, Check0, RaAux, false),

%% this is called after each batch of commands have been applied
%% set timer for message expire
%% should really be the last applied index ts but this will have to do
Effects1 = timer_effect(Ts, MacState, Effects0),
case query_notify_decorators_info(MacState) of
LastDec ->
{no_reply, Aux0#?AUX{last_checkpoint = Check}, RaAux, Effects1};
{MaxActivePriority, IsEmpty} = NewLast ->
Effects = [notify_decorators_effect(QName, MaxActivePriority, IsEmpty)
| Effects1],
{no_reply, Aux0#?AUX{last_checkpoint = Check,
last_decorators_state = NewLast}, RaAux, Effects}
end;
handle_aux(_RaftState, cast, eval,
#?AUX{last_checkpoint = Check0} = Aux0,
RaAux) ->
Ts = erlang:system_time(millisecond),
{Check, Effects} = do_checkpoints(Ts, Check0, RaAux, false),
{no_reply, Aux0#?AUX{last_checkpoint = Check}, RaAux, Effects};
handle_aux(_RaftState, cast, {#return{msg_ids = MsgIds,
consumer_key = Key} = Ret, Corr, Pid},
Aux0, RaAux0) ->
Expand All @@ -959,18 +983,18 @@ handle_aux(_RaftState, cast, {#return{msg_ids = MsgIds,
case find_consumer(Key, Consumers) of
{ConsumerKey, #consumer{checked_out = Checked}} ->
{RaAux, ToReturn} =
maps:fold(
fun (MsgId, ?MSG(Idx, Header), {RA0, Acc}) ->
%% it is possible this is not found if the consumer
%% crashed and the message got removed
case ra_aux:log_fetch(Idx, RA0) of
{{_Term, _Meta, Cmd}, RA} ->
Msg = get_msg(Cmd),
{RA, [{MsgId, Idx, Header, Msg} | Acc]};
{undefined, RA} ->
{RA, Acc}
end
end, {RaAux0, []}, maps:with(MsgIds, Checked)),
maps:fold(
fun (MsgId, ?MSG(Idx, Header), {RA0, Acc}) ->
%% it is possible this is not found if the consumer
%% crashed and the message got removed
case ra_aux:log_fetch(Idx, RA0) of
{{_Term, _Meta, Cmd}, RA} ->
Msg = get_msg(Cmd),
{RA, [{MsgId, Idx, Header, Msg} | Acc]};
{undefined, RA} ->
{RA, Acc}
end
end, {RaAux0, []}, maps:with(MsgIds, Checked)),

Appends = make_requeue(ConsumerKey, {notify, Corr, Pid},
lists:sort(ToReturn), []),
Expand Down Expand Up @@ -1020,35 +1044,6 @@ handle_aux(_, _, {get_checked_out, ConsumerKey, MsgIds}, Aux0, RaAux0) ->
_ ->
{reply, {error, consumer_not_found}, Aux0, RaAux0}
end;
handle_aux(leader, cast, eval,
#?AUX{last_decorators_state = LastDec,
last_checkpoint = Check0} = Aux0,
RaAux) ->
#?STATE{cfg = #cfg{resource = QName}} = MacState =
ra_aux:machine_state(RaAux),

Ts = erlang:system_time(millisecond),
{Check, Effects0} = do_checkpoints(Ts, Check0, RaAux),

%% this is called after each batch of commands have been applied
%% set timer for message expire
%% should really be the last applied index ts but this will have to do
Effects1 = timer_effect(Ts, MacState, Effects0),
case query_notify_decorators_info(MacState) of
LastDec ->
{no_reply, Aux0#?AUX{last_checkpoint = Check}, RaAux, Effects1};
{MaxActivePriority, IsEmpty} = NewLast ->
Effects = [notify_decorators_effect(QName, MaxActivePriority, IsEmpty)
| Effects1],
{no_reply, Aux0#?AUX{last_checkpoint = Check,
last_decorators_state = NewLast}, RaAux, Effects}
end;
handle_aux(_RaftState, cast, eval,
#?AUX{last_checkpoint = Check0} = Aux0,
RaAux) ->
Ts = erlang:system_time(millisecond),
{Check, Effects} = do_checkpoints(Ts, Check0, RaAux),
{no_reply, Aux0#?AUX{last_checkpoint = Check}, RaAux, Effects};
handle_aux(_RaState, cast, Cmd, #?AUX{capacity = Use0} = Aux0, RaAux)
when Cmd == active orelse Cmd == inactive ->
{no_reply, Aux0#?AUX{capacity = update_use(Use0, Cmd)}, RaAux};
Expand Down Expand Up @@ -1107,6 +1102,11 @@ handle_aux(_RaState, {call, _From}, {peek, Pos}, Aux0,
end;
handle_aux(_, _, garbage_collection, Aux, RaAux) ->
{no_reply, force_eval_gc(RaAux, Aux), RaAux};
handle_aux(_RaState, _, force_checkpoint,
#?AUX{last_checkpoint = Check0} = Aux, RaAux) ->
Ts = erlang:system_time(millisecond),
{Check, Effects} = do_checkpoints(Ts, Check0, RaAux, true),
{no_reply, Aux#?AUX{last_checkpoint= Check}, RaAux, Effects};
handle_aux(RaState, _, {dlx, _} = Cmd, Aux0, RaAux) ->
#?STATE{dlx = DlxState,
cfg = #cfg{dead_letter_handler = DLH,
Expand Down Expand Up @@ -2639,8 +2639,8 @@ suspected_pids_for(Node, #?STATE{consumers = Cons0,
end, Enqs, WaitingConsumers0).

is_expired(Ts, #?STATE{cfg = #cfg{expires = Expires},
last_active = LastActive,
consumers = Consumers})
last_active = LastActive,
consumers = Consumers})
when is_number(LastActive) andalso is_number(Expires) ->
%% TODO: should it be active consumers?
Active = maps:filter(fun (_, #consumer{status = suspected_down}) ->
Expand Down Expand Up @@ -2845,53 +2845,53 @@ priority_tag(Msg) ->
lo
end.

-define(CHECK_ENQ_MIN_INTERVAL_MS, 500).
-define(CHECK_ENQ_MIN_INDEXES, 4096).
-define(CHECK_MIN_INTERVAL_MS, 5000).
-define(CHECK_MIN_INDEXES, 65456).

do_checkpoints(Ts,
#checkpoint{index = ChIdx,
timestamp = ChTime,
enqueue_count = ChEnqCnt,
smallest_index = LastSmallest,
messages_total = LastMsgsTot} = Check0, RaAux) ->
indexes = MinIndexes} = Check0, RaAux, Force) ->
LastAppliedIdx = ra_aux:last_applied(RaAux),
#?STATE{enqueue_count = EnqCnt} = MacState = ra_aux:machine_state(RaAux),
IndexesSince = LastAppliedIdx - ChIdx,
#?STATE{} = MacState = ra_aux:machine_state(RaAux),
TimeSince = Ts - ChTime,
NewSmallest = case smallest_raft_index(MacState) of
undefined ->
LastAppliedIdx;
Smallest ->
Smallest
end,
MsgsTot = messages_total(MacState),
Mult = case MsgsTot > 200_000 of
true ->
min(4, MsgsTot div 100_000);
false ->
1
end,
Since = Ts - ChTime,
NewSmallest = case smallest_raft_index(MacState) of
undefined ->
LastAppliedIdx;
Smallest ->
Smallest
end,
{Check, Effects} = case (EnqCnt - ChEnqCnt > ?CHECK_ENQ_MIN_INDEXES andalso
Since > (?CHECK_ENQ_MIN_INTERVAL_MS * Mult)) orelse
(LastAppliedIdx - ChIdx > ?CHECK_MIN_INDEXES andalso
Since > (?CHECK_MIN_INTERVAL_MS * Mult)) orelse
(LastMsgsTot > 0 andalso MsgsTot == 0) of
true ->
%% take a checkpoint;
{#checkpoint{index = LastAppliedIdx,
timestamp = Ts,
enqueue_count = EnqCnt,
smallest_index = NewSmallest,
messages_total = MsgsTot},
[{checkpoint, LastAppliedIdx, MacState} |
release_cursor(LastSmallest, NewSmallest)]};
false ->
{Check0#checkpoint{smallest_index = NewSmallest},
release_cursor(LastSmallest, NewSmallest)}
end,

{Check, Effects}.
{CheckMinInterval, CheckMinIndexes, CheckMaxIndexes} =
persistent_term:get(quorum_queue_checkpoint_config,
{?CHECK_MIN_INTERVAL_MS, ?CHECK_MIN_INDEXES,
?CHECK_MAX_INDEXES}),
EnoughTimeHasPassed = TimeSince > CheckMinInterval,

%% enough time has passed and enough indexes have been committed
case (IndexesSince > MinIndexes andalso
EnoughTimeHasPassed) orelse
%% the queue is empty and some commands have been
%% applied since the last checkpoint
(MsgsTot == 0 andalso
IndexesSince > CheckMinIndexes andalso
EnoughTimeHasPassed) orelse
Force of
true ->
%% take fewer checkpoints the more messages there are on queue
NextIndexes = min(max(MsgsTot, CheckMinIndexes), CheckMaxIndexes),
%% take a checkpoint;
{#checkpoint{index = LastAppliedIdx,
timestamp = Ts,
smallest_index = NewSmallest,
messages_total = MsgsTot,
indexes = NextIndexes},
[{checkpoint, LastAppliedIdx, MacState} |
release_cursor(LastSmallest, NewSmallest)]};
false ->
{Check0#checkpoint{smallest_index = NewSmallest},
release_cursor(LastSmallest, NewSmallest)}
end.

release_cursor(LastSmallest, Smallest)
when is_integer(LastSmallest) andalso
Expand Down
Loading

0 comments on commit c6aaa50

Please sign in to comment.