Skip to content


feat: add "pop at least N bytes" functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
thalesmg committed Jan 31, 2025
1 parent 1d665e2 commit cdcdc18
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 11 deletions.
39 changes: 28 additions & 11 deletions src/replayq.erl
Original file line number Diff line number Diff line change
Expand Up @@ -244,12 +244,15 @@ append(#{config := #{seg_bytes := BytesLimit, dir := Dir} = Config,
}) ->
{q(), ack_ref(), [item()]}.
pop(Q, Opts) ->
Bytes = maps:get(bytes_limit, Opts, ?DEFAULT_POP_BYTES_LIMIT),
{BytesMode, Bytes} = case maps:get(bytes_limit, Opts, ?DEFAULT_POP_BYTES_LIMIT) of
{Mode0, Bytes0} -> {Mode0, Bytes0};
Bytes0 when is_integer(Bytes0) -> {at_most, Bytes0}
Count = maps:get(count_limit, Opts, ?DEFAULT_POP_COUNT_LIMIT),
{StopFun, StopFunAcc} =
maps:get(stop_before, Opts, {fun ?MODULE:default_stop_before_func/2, none}),
true = (Count > 0),
pop(Q, Bytes, Count, ?NOTHING_TO_ACK, [], StopFun, StopFunAcc).
pop(Q, BytesMode, Bytes, Count, ?NOTHING_TO_ACK, [], StopFun, StopFunAcc).

%% @doc peek the queue front item.
-spec peek(q()) -> empty | item().
Expand Down Expand Up @@ -327,25 +330,30 @@ transform(Id, [Item0 | Rest], Sizer, Count, Bytes, Acc) ->
append_in_mem([], Q) -> Q;
append_in_mem([Item | Rest], Q) -> append_in_mem(Rest, queue:in(Item, Q)).

pop(Q, _Bytes, 0, AckRef, Acc, _StopFun, _StopFunAcc) ->
pop(Q, _BytesMode, _Bytes, 0, AckRef, Acc, _StopFun, _StopFunAcc) ->
Result = lists:reverse(Acc),
ok = maybe_save_pending_acks(AckRef, Q, Result),
{Q, AckRef, Result};
pop(#{config := Cfg} = Q, Bytes, Count, AckRef, Acc, StopFun, StopFunAcc) ->
pop(Q, _BytesMode, Bytes, _Count, AckRef, Acc, _StopFun, _StopFunAcc) when Bytes =< 0 ->
Result = lists:reverse(Acc),
ok = maybe_save_pending_acks(AckRef, Q, Result),
{Q, AckRef, Result};
pop(#{config := Cfg} = Q, BytesMode, Bytes, Count, AckRef, Acc, StopFun, StopFunAcc) ->
case is_empty(Q) of
true ->
{Q, AckRef, lists:reverse(Acc)};
false when Cfg =:= mem_only ->
pop_mem(Q, Bytes, Count, Acc, StopFun, StopFunAcc);
pop_mem(Q, BytesMode, Bytes, Count, Acc, StopFun, StopFunAcc);
false ->
pop2(Q, Bytes, Count, AckRef, Acc, StopFun, StopFunAcc)
pop2(Q, BytesMode, Bytes, Count, AckRef, Acc, StopFun, StopFunAcc)

pop_mem(#{in_mem := InMem,
stats := #{count := TotalCount, bytes := TotalBytes} = Stats
} = Q, Bytes, Count, Acc, StopFun, StopFunAcc) ->
} = Q, BytesMode, Bytes, Count, Acc, StopFun, StopFunAcc) ->
case queue:out(InMem) of
{{value, ?MEM_ONLY_ITEM(Sz, _Item)}, _} when Sz > Bytes andalso Acc =/= [] ->
{{value, ?MEM_ONLY_ITEM(Sz, _Item)}, _} when
BytesMode == at_most andalso Sz > Bytes andalso Acc =/= [] ->
{Q, ?NOTHING_TO_ACK, lists:reverse(Acc)};
{{value, ?MEM_ONLY_ITEM(Sz, Item)}, Rest} ->
case StopFun(Item, StopFunAcc) of
Expand All @@ -358,6 +366,7 @@ pop_mem(#{in_mem := InMem,
Bytes - Sz,
Count - 1,
Expand All @@ -370,12 +379,13 @@ pop_mem(#{in_mem := InMem,
pop2(#{head_segno := ReaderSegno,
in_mem := HeadItems,
stats := #{count := TotalCount, bytes := TotalBytes} = Stats
} = Q, Bytes, Count, AckRef, Acc, StopFun, StopFunAcc) ->
} = Q, BytesMode, Bytes, Count, AckRef, Acc, StopFun, StopFunAcc) ->
case queue:out(HeadItems) of
{empty, _} ->
Q1 = open_next_seg(Q),
pop(Q1, Bytes, Count, AckRef, Acc, StopFun, StopFunAcc);
{{value, ?DISK_CP_ITEM(_, Sz, _Item)}, _} when Sz > Bytes andalso Acc =/= [] ->
pop(Q1, BytesMode, Bytes, Count, AckRef, Acc, StopFun, StopFunAcc);
{{value, ?DISK_CP_ITEM(_, Sz, _Item)}, _} when
BytesMode == at_most andalso Sz > Bytes andalso Acc =/= [] ->
%% taking the head item would cause exceeding size limit
{Q, AckRef, lists:reverse(Acc)};
{{value, ?DISK_CP_ITEM(Id, Sz, Item)}, Rest} ->
Expand All @@ -395,6 +405,7 @@ pop2(#{head_segno := ReaderSegno,
NewAckRef = {ReaderSegno, Id},
Bytes - Sz,
Count - 1,
Expand Down Expand Up @@ -760,3 +771,9 @@ is_segment_full(#{bytes := SegmentBytes},
%% We can change implementation to split items list to avoid
%% segment overflow if really necessary
SegmentBytes >= SegmentBytesLimit.

%%%_* Emacs ====================================================================
%%% Local Variables:
%%% allout-layout: t
%%% erlang-indent-level: 2
%%% End:
43 changes: 43 additions & 0 deletions test/replayq_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,43 @@ corrupted_commit_test() ->
ok = replayq:close(Q4),
ok = cleanup(Dir).

pop_at_least_bytes_mem_test() ->
Config = #{
mem_only => true,
seg_bytes => 1000,
sizer => fun(Item) -> size(Item) end

pop_at_least_bytes_disk_test() ->
Dir = ?DIR,
Config = #{
dir => Dir,
seg_bytes => 1000,
sizer => fun(Item) -> size(Item) end
ok = cleanup(Dir).

test_pop_at_least_bytes(Config) ->
Q0 = replayq:open(Config),
%% Two 5 bytes elements
Item1 = <<"12345">>,
Item2 = <<"67890">>,
Q1 = replayq:append(Q0, [Item1, Item2]),
ItemSize = 5,
?assertEqual(ItemSize * 2, replayq:bytes(Q1)),
%% Default behavior: we pop _at most_ N bytes, and return at least 1 item, if any.
%% Asking for less bytes than the 2 elements should yield singleton batch.
{_Q2, _Ack2, [Item1]} = replayq:pop(Q1, #{count_limit => 10, bytes_limit => ItemSize - 1}),
{_Q3, _Ack3, [Item1]} = replayq:pop(Q1, #{count_limit => 10, bytes_limit => {at_most, ItemSize - 1}}),
%% ... but dropping _at least_ less bytes than 1 item should drop both of them.
{Q4, _Ack4, [Item1, Item2]} =
replayq:pop(Q1, #{count_limit => 10, bytes_limit => {at_least, ItemSize + 1}}),
?assertEqual(0, replayq:bytes(Q4)),
ok = replayq:close(Q4),

%% helpers ===========================================================

cleanup(Dir) ->
Expand Down Expand Up @@ -586,3 +623,9 @@ filename(Segno) ->
%% corrupt the segment
corrupt(#{w_cur := #{fd := Fd}}) ->
file:write(Fd, "some random bytes").

%%%_* Emacs ====================================================================
%%% Local Variables:
%%% allout-layout: t
%%% erlang-indent-level: 2
%%% End:

0 comments on commit cdcdc18

Please sign in to comment.