diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl index 7d357beadc13..6156173d15d3 100644 --- a/deps/rabbit/src/rabbit_fifo.erl +++ b/deps/rabbit/src/rabbit_fifo.erl @@ -192,7 +192,6 @@ init(#{name := Name, update_config(Conf, State) -> DLH = maps:get(dead_letter_handler, Conf, undefined), BLH = maps:get(become_leader_handler, Conf, undefined), - RCI = maps:get(release_cursor_interval, Conf, ?RELEASE_CURSOR_EVERY), Overflow = maps:get(overflow_strategy, Conf, drop_head), MaxLength = maps:get(max_length, Conf, undefined), MaxBytes = maps:get(max_bytes, Conf, undefined), @@ -206,11 +205,9 @@ update_config(Conf, State) -> competing end, Cfg = State#?STATE.cfg, - RCISpec = {RCI, RCI}, LastActive = maps:get(created, Conf, undefined), - State#?STATE{cfg = Cfg#cfg{release_cursor_interval = RCISpec, - dead_letter_handler = DLH, + State#?STATE{cfg = Cfg#cfg{dead_letter_handler = DLH, become_leader_handler = BLH, overflow_strategy = Overflow, max_length = MaxLength, @@ -580,9 +577,8 @@ apply(#{system_time := Ts} = Meta, Effects = [{monitor, node, Node} | Effects1], checkout(Meta, State0, State#?STATE{enqueuers = Enqs, last_active = Ts}, Effects); -apply(#{index := _Idx} = Meta, {down, Pid, _Info}, State0) -> - {State1, Effects1} = activate_next_consumer( - handle_down(Meta, Pid, State0)), +apply(Meta, {down, Pid, _Info}, State0) -> + {State1, Effects1} = activate_next_consumer(handle_down(Meta, Pid, State0)), checkout(Meta, State0, State1, Effects1); apply(Meta, {nodeup, Node}, #?STATE{consumers = Cons0, enqueuers = Enqs0, @@ -670,7 +666,8 @@ convert_v3_to_v4(#{} = _Meta, StateV3) -> end, Returns0)), Messages = rabbit_fifo_q:from_lqueue(Messages0), - #?STATE{cfg = rabbit_fifo_v3:get_field(cfg, StateV3), + Cfg = rabbit_fifo_v3:get_field(cfg, StateV3), + #?STATE{cfg = Cfg#cfg{unused_1 = ?NIL}, messages = Messages, messages_total = rabbit_fifo_v3:get_field(messages_total, StateV3), returns = Returns, @@ -813,8 +810,7 @@ state_enter0(_, _, Effects) -> Effects. -spec tick(non_neg_integer(), state()) -> ra_machine:effects(). -tick(Ts, #?STATE{cfg = #cfg{name = _Name, - resource = QName}} = State) -> +tick(Ts, #?STATE{cfg = #cfg{resource = QName}} = State) -> case is_expired(Ts, State) of true -> [{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}]; @@ -835,7 +831,6 @@ overview(#?STATE{consumers = Cons, waiting_consumers = WaitingConsumers} = State) -> Conf = #{name => Cfg#cfg.name, resource => Cfg#cfg.resource, - release_cursor_interval => Cfg#cfg.release_cursor_interval, dead_lettering_enabled => undefined =/= Cfg#cfg.dead_letter_handler, max_length => Cfg#cfg.max_length, max_bytes => Cfg#cfg.max_bytes, @@ -908,9 +903,10 @@ which_module(4) -> ?MODULE. -record(checkpoint, {index :: ra:index(), timestamp :: milliseconds(), - enqueue_count :: non_neg_integer(), smallest_index :: undefined | ra:index(), - messages_total :: non_neg_integer()}). + messages_total :: non_neg_integer(), + indexes = ?CHECK_MIN_INDEXES :: non_neg_integer(), + unused_1 = ?NIL}). -record(aux_gc, {last_raft_idx = 0 :: ra:index()}). -record(aux, {name :: atom(), capacity :: term(), @@ -934,8 +930,8 @@ init_aux(Name) when is_atom(Name) -> capacity = {inactive, Now, 1, 1.0}, last_checkpoint = #checkpoint{index = 0, timestamp = erlang:system_time(millisecond), - enqueue_count = 0, - messages_total = 0}}. + messages_total = 0, + unused_1 = ?NIL}}. handle_aux(RaftState, Tag, Cmd, #aux{name = Name, capacity = Cap, @@ -959,18 +955,18 @@ handle_aux(_RaftState, cast, {#return{msg_ids = MsgIds, case find_consumer(Key, Consumers) of {ConsumerKey, #consumer{checked_out = Checked}} -> {RaAux, ToReturn} = - maps:fold( - fun (MsgId, ?MSG(Idx, Header), {RA0, Acc}) -> - %% it is possible this is not found if the consumer - %% crashed and the message got removed - case ra_aux:log_fetch(Idx, RA0) of - {{_Term, _Meta, Cmd}, RA} -> - Msg = get_msg(Cmd), - {RA, [{MsgId, Idx, Header, Msg} | Acc]}; - {undefined, RA} -> - {RA, Acc} - end - end, {RaAux0, []}, maps:with(MsgIds, Checked)), + maps:fold( + fun (MsgId, ?MSG(Idx, Header), {RA0, Acc}) -> + %% it is possible this is not found if the consumer + %% crashed and the message got removed + case ra_aux:log_fetch(Idx, RA0) of + {{_Term, _Meta, Cmd}, RA} -> + Msg = get_msg(Cmd), + {RA, [{MsgId, Idx, Header, Msg} | Acc]}; + {undefined, RA} -> + {RA, Acc} + end + end, {RaAux0, []}, maps:with(MsgIds, Checked)), Appends = make_requeue(ConsumerKey, {notify, Corr, Pid}, lists:sort(ToReturn), []), @@ -2639,8 +2635,8 @@ suspected_pids_for(Node, #?STATE{consumers = Cons0, end, Enqs, WaitingConsumers0). is_expired(Ts, #?STATE{cfg = #cfg{expires = Expires}, - last_active = LastActive, - consumers = Consumers}) + last_active = LastActive, + consumers = Consumers}) when is_number(LastActive) andalso is_number(Expires) -> %% TODO: should it be active consumers? Active = maps:filter(fun (_, #consumer{status = suspected_down}) -> @@ -2845,53 +2841,48 @@ priority_tag(Msg) -> lo end. --define(CHECK_ENQ_MIN_INTERVAL_MS, 500). --define(CHECK_ENQ_MIN_INDEXES, 4096). --define(CHECK_MIN_INTERVAL_MS, 5000). --define(CHECK_MIN_INDEXES, 65456). do_checkpoints(Ts, #checkpoint{index = ChIdx, timestamp = ChTime, - enqueue_count = ChEnqCnt, smallest_index = LastSmallest, - messages_total = LastMsgsTot} = Check0, RaAux) -> + indexes = Interval} = Check0, RaAux) -> LastAppliedIdx = ra_aux:last_applied(RaAux), - #?STATE{enqueue_count = EnqCnt} = MacState = ra_aux:machine_state(RaAux), - MsgsTot = messages_total(MacState), - Mult = case MsgsTot > 200_000 of - true -> - min(4, MsgsTot div 100_000); - false -> - 1 - end, - Since = Ts - ChTime, - NewSmallest = case smallest_raft_index(MacState) of - undefined -> - LastAppliedIdx; - Smallest -> - Smallest - end, - {Check, Effects} = case (EnqCnt - ChEnqCnt > ?CHECK_ENQ_MIN_INDEXES andalso - Since > (?CHECK_ENQ_MIN_INTERVAL_MS * Mult)) orelse - (LastAppliedIdx - ChIdx > ?CHECK_MIN_INDEXES andalso - Since > (?CHECK_MIN_INTERVAL_MS * Mult)) orelse - (LastMsgsTot > 0 andalso MsgsTot == 0) of - true -> - %% take a checkpoint; - {#checkpoint{index = LastAppliedIdx, - timestamp = Ts, - enqueue_count = EnqCnt, - smallest_index = NewSmallest, - messages_total = MsgsTot}, - [{checkpoint, LastAppliedIdx, MacState} | - release_cursor(LastSmallest, NewSmallest)]}; - false -> - {Check0#checkpoint{smallest_index = NewSmallest}, - release_cursor(LastSmallest, NewSmallest)} - end, - - {Check, Effects}. + IndexesSince = LastAppliedIdx - ChIdx, + #?STATE{} = MacState = ra_aux:machine_state(RaAux), + TimeSince = Ts - ChTime, + NewSmallest = case smallest_raft_index(MacState) of + undefined -> + LastAppliedIdx; + Smallest -> + Smallest + end, + MsgsTot = messages_ready(MacState), + {CheckMinInterval, CheckMinIndexes, CheckMaxIndexes} = + persistent_term:get(quorum_queue_checkpoint_config, + {?CHECK_MIN_INTERVAL_MS, ?CHECK_MIN_INDEXES, + ?CHECK_MAX_INDEXES}), + + case (IndexesSince > Interval andalso + TimeSince > CheckMinInterval) orelse + %% the queue is empty and some commands have been + %% applied since the last snapshot + (IndexesSince > CheckMinIndexes andalso MsgsTot == 0) of + true -> + %% take fewer checkpoints the more messages there are on queue + NextIndexes = min(max(MsgsTot, CheckMinIndexes), CheckMaxIndexes), + %% take a checkpoint; + {#checkpoint{index = LastAppliedIdx, + timestamp = Ts, + smallest_index = NewSmallest, + messages_total = MsgsTot, + indexes = NextIndexes}, + [{checkpoint, LastAppliedIdx, MacState} | + release_cursor(LastSmallest, NewSmallest)]}; + false -> + {Check0#checkpoint{smallest_index = NewSmallest}, + release_cursor(LastSmallest, NewSmallest)} + end. release_cursor(LastSmallest, Smallest) when is_integer(LastSmallest) andalso diff --git a/deps/rabbit/src/rabbit_fifo.hrl b/deps/rabbit/src/rabbit_fifo.hrl index a436b5df8adf..8e3602453efd 100644 --- a/deps/rabbit/src/rabbit_fifo.hrl +++ b/deps/rabbit/src/rabbit_fifo.hrl @@ -12,6 +12,8 @@ %% Raw message data is always stored on disk. -define(MSG(Index, Header), ?TUPLE(Index, Header)). +-define(NIL, []). + -define(IS_HEADER(H), (is_integer(H) andalso H >= 0) orelse is_list(H) orelse @@ -97,8 +99,10 @@ -type applied_mfa() :: {module(), atom(), list()}. % represents a partially applied module call --define(RELEASE_CURSOR_EVERY, 2048 * 4). --define(RELEASE_CURSOR_EVERY_MAX, 1_000_000). +-define(CHECK_MIN_INTERVAL_MS, 666). +-define(CHECK_MIN_INDEXES, 4096). +-define(CHECK_MAX_INDEXES, 1_000_000). + -define(USE_AVG_HALF_LIFE, 10000.0). %% an average QQ without any message uses about 100KB so setting this limit %% to ~10 times that should be relatively safe. @@ -143,20 +147,20 @@ -record(enqueuer, {next_seqno = 1 :: msg_seqno(), % out of order enqueues - sorted list - unused, + unused = ?NIL, status = up :: up | suspected_down, %% it is useful to have a record of when this was blocked %% so that we can retry sending the block effect if %% the publisher did not receive the initial one blocked :: option(ra:index()), - unused_1, - unused_2 + unused_1 = ?NIL, + unused_2 = ?NIL }). -record(cfg, {name :: atom(), resource :: rabbit_types:r('queue'), - release_cursor_interval :: option({non_neg_integer(), non_neg_integer()}), + unused_1 = ?NIL, dead_letter_handler :: dead_letter_handler(), become_leader_handler :: option(applied_mfa()), overflow_strategy = drop_head :: drop_head | reject_publish, @@ -168,8 +172,8 @@ delivery_limit :: option(non_neg_integer()), expires :: option(milliseconds()), msg_ttl :: option(milliseconds()), - unused_1, - unused_2 + unused_2 = ?NIL, + unused_3 = ?NIL }). -record(rabbit_fifo, @@ -191,7 +195,7 @@ % index when there are large gaps but should be faster than gb_trees % for normal appending operations as it's backed by a map ra_indexes = rabbit_fifo_index:empty() :: rabbit_fifo_index:state(), - unused_1, + unused_1 = ?NIL, % consumers need to reflect consumer state at time of snapshot consumers = #{} :: #{consumer_key() => consumer()}, % consumers that require further service are queued here @@ -205,14 +209,15 @@ waiting_consumers = [] :: [{consumer_key(), consumer()}], last_active :: option(non_neg_integer()), msg_cache :: option({ra:index(), raw_msg()}), - unused_2 + unused_2 = ?NIL }). -type config() :: #{name := atom(), queue_resource := rabbit_types:r('queue'), dead_letter_handler => dead_letter_handler(), become_leader_handler => applied_mfa(), - release_cursor_interval => non_neg_integer(), + checkpoint_min_indexes => non_neg_integer(), + checkpoint_max_indexes => non_neg_integer(), max_length => non_neg_integer(), max_bytes => non_neg_integer(), max_in_memory_length => non_neg_integer(), diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index e9a492a66881..9084c1369a6d 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -315,8 +315,6 @@ ra_machine_config(Q) when ?is_amqqueue(Q) -> OverflowBin = args_policy_lookup(<<"overflow">>, fun policyHasPrecedence/2, Q), Overflow = overflow(OverflowBin, drop_head, QName), MaxBytes = args_policy_lookup(<<"max-length-bytes">>, fun min/2, Q), - MaxMemoryLength = args_policy_lookup(<<"max-in-memory-length">>, fun min/2, Q), - MaxMemoryBytes = args_policy_lookup(<<"max-in-memory-bytes">>, fun min/2, Q), DeliveryLimit = args_policy_lookup(<<"delivery-limit">>, fun min/2, Q), Expires = args_policy_lookup(<<"expires">>, fun min/2, Q), MsgTTL = args_policy_lookup(<<"message-ttl">>, fun min/2, Q), @@ -326,8 +324,6 @@ ra_machine_config(Q) when ?is_amqqueue(Q) -> become_leader_handler => {?MODULE, become_leader, [QName]}, max_length => MaxLength, max_bytes => MaxBytes, - max_in_memory_length => MaxMemoryLength, - max_in_memory_bytes => MaxMemoryBytes, single_active_consumer_on => single_active_consumer_on(Q), delivery_limit => DeliveryLimit, overflow_strategy => Overflow, diff --git a/deps/rabbit/test/rabbit_fifo_SUITE.erl b/deps/rabbit/test/rabbit_fifo_SUITE.erl index 753704affd09..a3608f26ef46 100644 --- a/deps/rabbit/test/rabbit_fifo_SUITE.erl +++ b/deps/rabbit/test/rabbit_fifo_SUITE.erl @@ -2737,7 +2737,44 @@ modify_test(Config) -> ok. +ttb_test(Config) -> + S0 = init(#{name => ?FUNCTION_NAME, + queue_resource => + rabbit_misc:r("/", queue, ?FUNCTION_NAME_B)}), + + + S1 = do_n(5_000_000, + fun (N, Acc) -> + I = (5_000_000 - N), + element(1, enq(Config, I, I, ?FUNCTION_NAME_B, Acc)) + end, S0), + + + + {T1, _Res} = timer:tc(fun () -> + do_n(100, fun (_, S) -> + term_to_binary(S), + S1 end, S1) + end), + ct:pal("T1 took ~bus", [T1]), + + + {T2, _} = timer:tc(fun () -> + do_n(100, fun (_, S) -> term_to_iovec(S), S1 end, S1) + end), + ct:pal("T2 took ~bus", [T2]), + + ok. + %% Utility +%% + +do_n(0, _, A) -> + A; +do_n(N, Fun, A0) -> + A = Fun(N, A0), + do_n(N-1, Fun, A). + init(Conf) -> rabbit_fifo:init(Conf). make_register_enqueuer(Pid) -> rabbit_fifo:make_register_enqueuer(Pid).