Skip to content

Commit

Permalink
Merge pull request #81 from zmstone/fix-pending-ack-handling
Browse files Browse the repository at this point in the history
Fix pending ack handling for buffer overflow drops
  • Loading branch information
zmstone authored Oct 9, 2024
2 parents ea56a87 + a67da3b commit a7e5b04
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 38 deletions.
3 changes: 2 additions & 1 deletion changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
* 4.0.2
- Fix dynamic topic producer initialization failure handling.
- Fix dynamic topic producer initialization failure handling (introduced in 3.0.0).
- Fix `unexpected_id` crash when replayq overflow (introduced in 4.0.1).

* 4.0.1
- Minimize callback context for sync call.
Expand Down
76 changes: 54 additions & 22 deletions src/wolff_pendack.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,19 @@
%% time in microseconds.
-module(wolff_pendack).

-export([new/0, count/1, insert/2, take/2]).
-export([new/0, count/1, insert_backlog/2]).
-export([take_inflight/2, drop_backlog/2, move_backlog_to_inflight/2]).

-export_type([acks/0]).

-type call_id() :: pos_integer().
-type key() :: call_id() | {call_id(), call_id()}.
-type cb() :: term().
-opaque acks() :: #{next_id := integer(),
cbs := queue:queue({key(), cb()})}.
backlog := queue:queue({key(), cb()}),
inflight := queue:queue({key(), cb()}),
count := non_neg_integer()
}.

%% @doc Initialize a new data structure.
new() ->
Expand All @@ -38,26 +42,21 @@ new() ->
%% the process crashed or node restarted.
Now = erlang:system_time(microsecond),
#{next_id => Now,
cbs => queue:new()
backlog => queue:new(),
inflight => queue:new(),
count => 0
}.

%% @doc count the total number of pending acks.
-spec count(acks()) -> non_neg_integer().
count(#{cbs := Cbs}) ->
sum(queue:to_list(Cbs), 0).

sum([], Acc) ->
Acc;
sum([{CallId, _} | Rest], Acc) when is_integer(CallId) ->
sum(Rest, Acc + 1);
sum([{{MinCallId, MaxCallId}, _} | Rest], Acc) ->
sum(Rest, Acc + MaxCallId - MinCallId + 1).

%% @doc insert a callback into the data structure.
-spec insert(acks(), cb()) -> {call_id(), acks()}.
insert(#{next_id := Id, cbs := Cbs} = X, Cb) ->
count(#{count := Count}) ->
Count.

%% @doc insert a callback into the backlog queue.
-spec insert_backlog(acks(), cb()) -> {call_id(), acks()}.
insert_backlog(#{next_id := Id, backlog := Cbs, count := Count} = X, Cb) ->
NewCbs = insert_cb(Cbs, Id, Cb),
{Id, X#{next_id => Id + 1, cbs => NewCbs}}.
{Id, X#{next_id => Id + 1, backlog => NewCbs, count => Count + 1}}.

insert_cb(Cbs, Id, Cb) ->
case queue:out_r(Cbs) of
Expand All @@ -84,17 +83,17 @@ expand_id({MinId, MaxId}, Id) ->
Id =:= MaxId + 1 orelse error({unexpected_id, {MinId, MaxId}, Id}),
{MinId, Id}.

%% @doc Take the callback for a given call ID.
%% The ID is expected to be the oldest in the queue.
%% @doc Take the callback from the inflight queue.
%% The ID is expected to be the oldest in the inflight queue.
%% Return the callback and the updated data structure.
-spec take(acks(), call_id()) -> {ok, cb(), acks()} | false.
take(#{cbs := Cbs} = X, Id) ->
-spec take_inflight(acks(), call_id()) -> {ok, cb(), acks()} | false.
take_inflight(#{inflight := Cbs, count := Count} = X, Id) ->
case take1(Cbs, Id) of
false ->
%% stale ack
false;
{ok, Cb, Cbs1} ->
{ok, Cb, X#{cbs => Cbs1}}
{ok, Cb, X#{inflight => Cbs1, count => Count - 1}}
end.

take1(Cbs0, Id) ->
Expand All @@ -120,3 +119,36 @@ take2(Cbs, {MinId, MaxId}, Cb, Id) when Id =:= MinId ->
end;
take2(_Cbs, {MinId, MaxId}, _Cb, Id) ->
error(#{cause => unexpected_id, min => MinId, max => MaxId, got => Id}).

%% @doc Drop calls from the head (older end) of the backlog queue.
%% Return list of callbacks and the remaining.
-spec drop_backlog(acks(), [call_id()]) -> {[cb()], acks()}.
drop_backlog(X, Ids) ->
drop1(X, Ids, []).

drop1(X, [], Acc) ->
{lists:reverse(Acc), X};
drop1(#{backlog := Cbs0, count := Count} = X, [Id | Ids], Acc) ->
case take1(Cbs0, Id) of
false ->
drop1(X, Ids, Acc);
{ok, Cb, Cbs} ->
drop1(X#{backlog := Cbs, count := Count - 1}, Ids, [Cb | Acc])
end.

%% @doc Move a list of calls from the head of the backlog queue
%% and push it to the inflight queue.
-spec move_backlog_to_inflight(acks(), [call_id()]) -> acks().
move_backlog_to_inflight(X, []) ->
X;
move_backlog_to_inflight(X, [Id | Ids]) ->
move_backlog_to_inflight(move1(X, Id), Ids).

move1(#{backlog := Backlog0, inflight := Inflight0} = X, Id) ->
case take1(Backlog0, Id) of
false ->
X;
{ok, Cb, Backlog} ->
Inflight = insert_cb(Inflight0, Id, Cb),
X#{backlog := Backlog, inflight := Inflight}
end.
30 changes: 18 additions & 12 deletions src/wolff_producer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -497,10 +497,13 @@ send_to_kafka(#{sent_reqs := SentReqs,
inflight_calls := InflightCalls,
replayq := Q,
config := #{max_batch_bytes := BytesLimit} = Config,
conn := Conn
conn := Conn,
pending_acks := PendingAcks
} = St0) ->
{NewQ, QAckRef, Items} =
replayq:pop(Q, #{bytes_limit => BytesLimit, count_limit => 999999999}),
IDs = lists:map(fun({ID, _}) -> ID end, get_calls_from_queue_items(Items)),
NewPendingAcks = wolff_pendack:move_backlog_to_inflight(PendingAcks, IDs),
wolff_metrics:queuing_set(Config, replayq:count(NewQ)),
NewSentReqsCount = SentReqsCount + 1,
NrOfCalls = count_calls(Items),
Expand All @@ -515,7 +518,8 @@ send_to_kafka(#{sent_reqs := SentReqs,
},
St2 = St1#{sent_reqs := queue:in(Sent, SentReqs),
sent_reqs_count := NewSentReqsCount,
inflight_calls := NewInflightCalls
inflight_calls := NewInflightCalls,
pending_acks := NewPendingAcks
},
ok = request_async(Conn, Req),
St3 = maybe_fake_kafka_ack(NoAck, Sent, St2),
Expand All @@ -534,7 +538,8 @@ is_send_ahead_allowed(#{config := #{max_send_ahead := Max},
is_idle(#{replayq := Q, sent_reqs_count := SentReqsCount}) ->
SentReqsCount =:= 0 andalso replayq:count(Q) =:= 0.

now_ts() ->erlang:system_time(millisecond).
now_ts() ->
erlang:system_time(millisecond).

make_queue_item(CallId, Batch) ->
?Q_ITEM(CallId, now_ts(), Batch).
Expand Down Expand Up @@ -755,7 +760,7 @@ ensure_delayed_reconnect(St, _Delay) ->
evaluate_pending_ack_funs(PendingAcks, [], _BaseOffset) -> PendingAcks;
evaluate_pending_ack_funs(PendingAcks, [{CallId, BatchSize} | Rest], BaseOffset) ->
NewPendingAcks =
case wolff_pendack:take(PendingAcks, CallId) of
case wolff_pendack:take_inflight(PendingAcks, CallId) of
{ok, AckCb, PendingAcks1} ->
ok = eval_ack_cb(AckCb, BaseOffset),
PendingAcks1;
Expand Down Expand Up @@ -838,10 +843,9 @@ collect_send_calls(Call, Bytes, ?EMPTY, Max) ->
Init = #{ts => now_ts(), bytes => 0, batch_r => []},
collect_send_calls(Call, Bytes, Init, Max);
collect_send_calls(Call, Bytes, Calls, Max) ->
#{ts := Ts, bytes := Bytes0, batch_r := BatchR} = Calls,
#{bytes := Bytes0, batch_r := BatchR} = Calls,
Sum = Bytes0 + Bytes,
R = Calls#{ts => Ts,
bytes => Sum,
R = Calls#{bytes => Sum,
batch_r => [Call | BatchR]
},
case Sum < Max of
Expand Down Expand Up @@ -913,7 +917,7 @@ enqueue_calls2(Calls,
%% keep callback funs in memory, do not seralize it into queue because
%% saving anonymous function to disk may easily lead to badfun exception
%% in case of restart on newer version beam.
{CallId, PendingAcksOut} = wolff_pendack:insert(PendingAcksIn, ?ACK_CB(AckFun, Partition)),
{CallId, PendingAcksOut} = wolff_pendack:insert_backlog(PendingAcksIn, ?ACK_CB(AckFun, Partition)),
NewItems = [make_queue_item(CallId, Batch) | Items],
{NewItems, PendingAcksOut, Size + batch_bytes(Batch)}
end, {[], PendingAcks0, 0}, Calls),
Expand Down Expand Up @@ -959,12 +963,14 @@ handle_overflow(#{replayq := Q,
replayq:pop(Q, #{bytes_limit => Overflow, count_limit => 999999999}),
ok = replayq:ack(NewQ, QAckRef),
Calls = get_calls_from_queue_items(Items),
CallIDs = lists:map(fun({CallId, _BatchSize}) -> CallId end, Calls),
NrOfCalls = count_calls(Items),
wolff_metrics:dropped_queue_full_inc(Config, NrOfCalls),
wolff_metrics:dropped_inc(Config, NrOfCalls),
wolff_metrics:queuing_set(Config, replayq:count(NewQ)),
ok = maybe_log_discard(St, NrOfCalls),
NewPendingAcks = evaluate_pending_ack_funs(PendingAcks, Calls, ?buffer_overflow_discarded),
{CbList, NewPendingAcks} = wolff_pendack:drop_backlog(PendingAcks, CallIDs),
lists:foreach(fun(Cb) -> eval_ack_cb(Cb, ?buffer_overflow_discarded) end, CbList),
St#{replayq := NewQ, pending_acks := NewPendingAcks}.

%% use process dictionary for upgrade without restart
Expand All @@ -981,7 +987,7 @@ maybe_log_discard(St, Increment) ->
maybe_log_discard(#{topic := Topic, partition := Partition},
Increment,
#{last_ts := LastTs, last_cnt := LastCnt, acc_cnt := AccCnt}) ->
NowTs = erlang:system_time(millisecond),
NowTs = now_ts(),
NewAccCnt = AccCnt + Increment,
DiffCnt = NewAccCnt - LastCnt,
case NowTs - LastTs > ?MIN_DISCARD_LOG_INTERVAL of
Expand Down Expand Up @@ -1080,7 +1086,7 @@ maybe_log_discard_test_() ->
[ {"no-increment", fun() -> maybe_log_discard(undefined, 0) end}
, {"fake-last-old",
fun() ->
Ts0 = erlang:system_time(millisecond) - ?MIN_DISCARD_LOG_INTERVAL - 1,
Ts0 = now_ts() - ?MIN_DISCARD_LOG_INTERVAL - 1,
ok = put_overflow_log_state(Ts0, 2, 2),
ok = maybe_log_discard(#{topic => <<"a">>, partition => 0}, 1),
St = get_overflow_log_state(),
Expand All @@ -1089,7 +1095,7 @@ maybe_log_discard_test_() ->
end}
, {"fake-last-fresh",
fun() ->
Ts0 = erlang:system_time(millisecond),
Ts0 = now_ts(),
ok = put_overflow_log_state(Ts0, 2, 2),
ok = maybe_log_discard(#{topic => <<"a">>, partition => 0}, 2),
St = get_overflow_log_state(),
Expand Down
4 changes: 2 additions & 2 deletions test/wolff_dynamic_topics_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ ack_cb_interlave_test() ->
%% inspect the pending acks
#{conn := Conn, pending_acks := Acks} = sys:get_state(Pid),
?assertEqual(N * 2, wolff_pendack:count(Acks)),
#{cbs := Cbs} = Acks,
?assertEqual(N * 2, queue:len(Cbs)),
#{inflight := Inflight, backlog := Backlog} = Acks,
?assertEqual(N * 2, queue:len(Backlog) + queue:len(Inflight)),
%% resume the connection
sys:resume(Conn),
%% wait for 2*N acks
Expand Down
67 changes: 66 additions & 1 deletion test/wolff_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,69 @@ zero_ack_test() ->
ets:delete(CntrEventsTable),
deinstall_event_logging(?FUNCTION_NAME).

replayq_overflow_while_inflight_test() ->
ClientCfg = client_config(),
{ok, Client} = start_client(<<"client-1">>, ?HOSTS, ClientCfg),
Msg = fun(Value) -> #{key => <<>>, value => Value} end,
Batch = fun(Value) -> [Msg(Value), Msg(Value)] end,
BatchSize = wolff_producer:batch_bytes(Batch(<<"testdata1">>)),
ProducerCfg = #{max_batch_bytes => 1, %% ensure send one call at a time
replayq_max_total_bytes => BatchSize,
required_acks => all_isr,
max_send_ahead => 0 %% ensure one sent to kafka at a time
},
{ok, Producers} = wolff:start_producers(Client, <<"test-topic">>, ProducerCfg),
Pid = wolff_producers:lookup_producer(Producers, 0),
?assert(is_process_alive(Pid)),
TesterPid = self(),
ok = meck:new(kpro, [no_history, no_link, passthrough]),
meck:expect(kpro, send,
fun(_Conn, Req) ->
Payload = iolist_to_binary(lists:last(tuple_to_list(Req))),
[_, <<N, _/binary>>] = binary:split(Payload, <<"testdata">>),
TesterPid ! {sent_to_kafka, <<"testdata", N>>},
ok
end),
AckFun1 = fun(_Partition, BaseOffset) -> TesterPid ! {ack_1, BaseOffset}, ok end,
AckFun2 = fun(_Partition, BaseOffset) -> TesterPid ! {ack_2, BaseOffset}, ok end,
AckFun3 = fun(_Partition, BaseOffset) -> TesterPid ! {ack_3, BaseOffset}, ok end,
SendF = fun(AckFun, Value) -> wolff:send(Producers, Batch(Value), AckFun) end,
%% send 3 batches first will be inflight, 2nd is overflow (kicked out by the 3rd)
proc_lib:spawn_link(fun() -> SendF(AckFun1, <<"testdata1">>) end),
proc_lib:spawn_link(fun() -> timer:sleep(5), SendF(AckFun2, <<"testdata2">>) end),
proc_lib:spawn_link(fun() -> timer:sleep(10), SendF(AckFun3, <<"testdata3">>) end),
try
%% the 1st batch is sent to Kafka, but no reply, so no ack
receive
{sent_to_kafka, <<"testdata1">>} -> ok;
{ack_1, _} -> error(unexpected)
after
2000 ->
error(timeout)
end,
%% the 2nd batch should be dropped (pushed out by the 3rd)
receive
{ack_2, buffer_overflow_discarded} -> ok;
{sent_to_kafka, <<"testdata2">>} -> error(unexpected);
{sent_to_kafka, Offset2} -> error({unexpected, Offset2})
after
2000 ->
io:format(user, "~p~n", [sys:get_state(Pid)]),
error(timeout)
end,
%% the 3rd batch should stay in the queue
receive
{ack_3, Any} -> error({unexpected, Any})
after
100 ->
ok
end
after
meck:unload(kpro),
ok = wolff:stop_producers(Producers),
ok = stop_client(Client)
end.

replayq_overflow_test() ->
CntrEventsTable = ets:new(cntr_events, [public]),
install_event_logging(?FUNCTION_NAME, CntrEventsTable, false),
Expand Down Expand Up @@ -928,7 +991,9 @@ install_event_logging(TestCaseName, EventRecordTable, LogEvents) ->
fun wolff_tests:handle_telemetry_event/4,
#{record_table => EventRecordTable,
log_events => LogEvents}
).
),
timer:sleep(100),
ok.

telemetry_events() ->
[
Expand Down

0 comments on commit a7e5b04

Please sign in to comment.