Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix pending ack handling for buffer overflow drops #81

Merged
merged 5 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
* 4.0.2
- Fix dynamic topic producer initialization failure handling.
- Fix dynamic topic producer initialization failure handling (introduced in 3.0.0).
- Fix `unexpected_id` crash when replayq overflow (introduced in 4.0.1).

* 4.0.1
- Minimize callback context for sync call.
Expand Down
76 changes: 54 additions & 22 deletions src/wolff_pendack.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,19 @@
%% time in microseconds.
-module(wolff_pendack).

-export([new/0, count/1, insert/2, take/2]).
-export([new/0, count/1, insert_backlog/2]).
-export([take_inflight/2, drop_backlog/2, move_backlog_to_inflight/2]).

-export_type([acks/0]).

-type call_id() :: pos_integer().
-type key() :: call_id() | {call_id(), call_id()}.
-type cb() :: term().
-opaque acks() :: #{next_id := integer(),
cbs := queue:queue({key(), cb()})}.
backlog := queue:queue({key(), cb()}),
inflight := queue:queue({key(), cb()}),
count := non_neg_integer()
}.

%% @doc Initialize a new data structure.
new() ->
Expand All @@ -38,26 +42,21 @@ new() ->
%% the process crashed or node restarted.
Now = erlang:system_time(microsecond),
#{next_id => Now,
cbs => queue:new()
backlog => queue:new(),
inflight => queue:new(),
count => 0
}.

%% @doc count the total number of pending acks.
-spec count(acks()) -> non_neg_integer().
count(#{cbs := Cbs}) ->
sum(queue:to_list(Cbs), 0).

sum([], Acc) ->
Acc;
sum([{CallId, _} | Rest], Acc) when is_integer(CallId) ->
sum(Rest, Acc + 1);
sum([{{MinCallId, MaxCallId}, _} | Rest], Acc) ->
sum(Rest, Acc + MaxCallId - MinCallId + 1).

%% @doc insert a callback into the data structure.
-spec insert(acks(), cb()) -> {call_id(), acks()}.
insert(#{next_id := Id, cbs := Cbs} = X, Cb) ->
count(#{count := Count}) ->
Count.

%% @doc insert a callback into the backlog queue.
-spec insert_backlog(acks(), cb()) -> {call_id(), acks()}.
insert_backlog(#{next_id := Id, backlog := Cbs, count := Count} = X, Cb) ->
NewCbs = insert_cb(Cbs, Id, Cb),
{Id, X#{next_id => Id + 1, cbs => NewCbs}}.
{Id, X#{next_id => Id + 1, backlog => NewCbs, count => Count + 1}}.

insert_cb(Cbs, Id, Cb) ->
case queue:out_r(Cbs) of
Expand All @@ -84,17 +83,17 @@ expand_id({MinId, MaxId}, Id) ->
Id =:= MaxId + 1 orelse error({unexpected_id, {MinId, MaxId}, Id}),
{MinId, Id}.

%% @doc Take the callback for a given call ID.
%% The ID is expected to be the oldest in the queue.
%% @doc Take the callback from the inflight queue.
%% The ID is expected to be the oldest in the inflight queue.
%% Return the callback and the updated data structure.
-spec take(acks(), call_id()) -> {ok, cb(), acks()} | false.
take(#{cbs := Cbs} = X, Id) ->
-spec take_inflight(acks(), call_id()) -> {ok, cb(), acks()} | false.
take_inflight(#{inflight := Cbs, count := Count} = X, Id) ->
case take1(Cbs, Id) of
false ->
%% stale ack
false;
{ok, Cb, Cbs1} ->
{ok, Cb, X#{cbs => Cbs1}}
{ok, Cb, X#{inflight => Cbs1, count => Count - 1}}
end.

take1(Cbs0, Id) ->
Expand All @@ -120,3 +119,36 @@ take2(Cbs, {MinId, MaxId}, Cb, Id) when Id =:= MinId ->
end;
take2(_Cbs, {MinId, MaxId}, _Cb, Id) ->
error(#{cause => unexpected_id, min => MinId, max => MaxId, got => Id}).

%% @doc Drop calls from the head (older end) of the backlog queue.
%% Return list of callbacks and the remaining.
-spec drop_backlog(acks(), [call_id()]) -> {[cb()], acks()}.
drop_backlog(X, Ids) ->
drop1(X, Ids, []).

drop1(X, [], Acc) ->
{lists:reverse(Acc), X};
drop1(#{backlog := Cbs0, count := Count} = X, [Id | Ids], Acc) ->
case take1(Cbs0, Id) of
false ->
drop1(X, Ids, Acc);
{ok, Cb, Cbs} ->
drop1(X#{backlog := Cbs, count := Count - 1}, Ids, [Cb | Acc])
end.

%% @doc Move a list of calls from the head of the backlog queue
%% and push it to the inflight queue.
-spec move_backlog_to_inflight(acks(), [call_id()]) -> acks().
move_backlog_to_inflight(X, []) ->
X;
move_backlog_to_inflight(X, [Id | Ids]) ->
move_backlog_to_inflight(move1(X, Id), Ids).

move1(#{backlog := Backlog0, inflight := Inflight0} = X, Id) ->
case take1(Backlog0, Id) of
false ->
X;
{ok, Cb, Backlog} ->
Inflight = insert_cb(Inflight0, Id, Cb),
X#{backlog := Backlog, inflight := Inflight}
end.
30 changes: 18 additions & 12 deletions src/wolff_producer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -497,10 +497,13 @@ send_to_kafka(#{sent_reqs := SentReqs,
inflight_calls := InflightCalls,
replayq := Q,
config := #{max_batch_bytes := BytesLimit} = Config,
conn := Conn
conn := Conn,
pending_acks := PendingAcks
} = St0) ->
{NewQ, QAckRef, Items} =
replayq:pop(Q, #{bytes_limit => BytesLimit, count_limit => 999999999}),
IDs = lists:map(fun({ID, _}) -> ID end, get_calls_from_queue_items(Items)),
NewPendingAcks = wolff_pendack:move_backlog_to_inflight(PendingAcks, IDs),
wolff_metrics:queuing_set(Config, replayq:count(NewQ)),
NewSentReqsCount = SentReqsCount + 1,
NrOfCalls = count_calls(Items),
Expand All @@ -515,7 +518,8 @@ send_to_kafka(#{sent_reqs := SentReqs,
},
St2 = St1#{sent_reqs := queue:in(Sent, SentReqs),
sent_reqs_count := NewSentReqsCount,
inflight_calls := NewInflightCalls
inflight_calls := NewInflightCalls,
pending_acks := NewPendingAcks
},
ok = request_async(Conn, Req),
St3 = maybe_fake_kafka_ack(NoAck, Sent, St2),
Expand All @@ -534,7 +538,8 @@ is_send_ahead_allowed(#{config := #{max_send_ahead := Max},
is_idle(#{replayq := Q, sent_reqs_count := SentReqsCount}) ->
SentReqsCount =:= 0 andalso replayq:count(Q) =:= 0.

now_ts() ->erlang:system_time(millisecond).
now_ts() ->
erlang:system_time(millisecond).

make_queue_item(CallId, Batch) ->
?Q_ITEM(CallId, now_ts(), Batch).
Expand Down Expand Up @@ -755,7 +760,7 @@ ensure_delayed_reconnect(St, _Delay) ->
evaluate_pending_ack_funs(PendingAcks, [], _BaseOffset) -> PendingAcks;
evaluate_pending_ack_funs(PendingAcks, [{CallId, BatchSize} | Rest], BaseOffset) ->
NewPendingAcks =
case wolff_pendack:take(PendingAcks, CallId) of
case wolff_pendack:take_inflight(PendingAcks, CallId) of
{ok, AckCb, PendingAcks1} ->
ok = eval_ack_cb(AckCb, BaseOffset),
PendingAcks1;
Expand Down Expand Up @@ -838,10 +843,9 @@ collect_send_calls(Call, Bytes, ?EMPTY, Max) ->
Init = #{ts => now_ts(), bytes => 0, batch_r => []},
collect_send_calls(Call, Bytes, Init, Max);
collect_send_calls(Call, Bytes, Calls, Max) ->
#{ts := Ts, bytes := Bytes0, batch_r := BatchR} = Calls,
#{bytes := Bytes0, batch_r := BatchR} = Calls,
Sum = Bytes0 + Bytes,
R = Calls#{ts => Ts,
bytes => Sum,
R = Calls#{bytes => Sum,
batch_r => [Call | BatchR]
},
case Sum < Max of
Expand Down Expand Up @@ -913,7 +917,7 @@ enqueue_calls2(Calls,
%% keep callback funs in memory, do not seralize it into queue because
%% saving anonymous function to disk may easily lead to badfun exception
%% in case of restart on newer version beam.
{CallId, PendingAcksOut} = wolff_pendack:insert(PendingAcksIn, ?ACK_CB(AckFun, Partition)),
{CallId, PendingAcksOut} = wolff_pendack:insert_backlog(PendingAcksIn, ?ACK_CB(AckFun, Partition)),
NewItems = [make_queue_item(CallId, Batch) | Items],
{NewItems, PendingAcksOut, Size + batch_bytes(Batch)}
end, {[], PendingAcks0, 0}, Calls),
Expand Down Expand Up @@ -959,12 +963,14 @@ handle_overflow(#{replayq := Q,
replayq:pop(Q, #{bytes_limit => Overflow, count_limit => 999999999}),
ok = replayq:ack(NewQ, QAckRef),
Calls = get_calls_from_queue_items(Items),
CallIDs = lists:map(fun({CallId, _BatchSize}) -> CallId end, Calls),
NrOfCalls = count_calls(Items),
wolff_metrics:dropped_queue_full_inc(Config, NrOfCalls),
wolff_metrics:dropped_inc(Config, NrOfCalls),
wolff_metrics:queuing_set(Config, replayq:count(NewQ)),
ok = maybe_log_discard(St, NrOfCalls),
NewPendingAcks = evaluate_pending_ack_funs(PendingAcks, Calls, ?buffer_overflow_discarded),
{CbList, NewPendingAcks} = wolff_pendack:drop_backlog(PendingAcks, CallIDs),
lists:foreach(fun(Cb) -> eval_ack_cb(Cb, ?buffer_overflow_discarded) end, CbList),
St#{replayq := NewQ, pending_acks := NewPendingAcks}.

%% use process dictionary for upgrade without restart
Expand All @@ -981,7 +987,7 @@ maybe_log_discard(St, Increment) ->
maybe_log_discard(#{topic := Topic, partition := Partition},
Increment,
#{last_ts := LastTs, last_cnt := LastCnt, acc_cnt := AccCnt}) ->
NowTs = erlang:system_time(millisecond),
NowTs = now_ts(),
NewAccCnt = AccCnt + Increment,
DiffCnt = NewAccCnt - LastCnt,
case NowTs - LastTs > ?MIN_DISCARD_LOG_INTERVAL of
Expand Down Expand Up @@ -1080,7 +1086,7 @@ maybe_log_discard_test_() ->
[ {"no-increment", fun() -> maybe_log_discard(undefined, 0) end}
, {"fake-last-old",
fun() ->
Ts0 = erlang:system_time(millisecond) - ?MIN_DISCARD_LOG_INTERVAL - 1,
Ts0 = now_ts() - ?MIN_DISCARD_LOG_INTERVAL - 1,
ok = put_overflow_log_state(Ts0, 2, 2),
ok = maybe_log_discard(#{topic => <<"a">>, partition => 0}, 1),
St = get_overflow_log_state(),
Expand All @@ -1089,7 +1095,7 @@ maybe_log_discard_test_() ->
end}
, {"fake-last-fresh",
fun() ->
Ts0 = erlang:system_time(millisecond),
Ts0 = now_ts(),
ok = put_overflow_log_state(Ts0, 2, 2),
ok = maybe_log_discard(#{topic => <<"a">>, partition => 0}, 2),
St = get_overflow_log_state(),
Expand Down
4 changes: 2 additions & 2 deletions test/wolff_dynamic_topics_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ ack_cb_interlave_test() ->
%% inspect the pending acks
#{conn := Conn, pending_acks := Acks} = sys:get_state(Pid),
?assertEqual(N * 2, wolff_pendack:count(Acks)),
#{cbs := Cbs} = Acks,
?assertEqual(N * 2, queue:len(Cbs)),
#{inflight := Inflight, backlog := Backlog} = Acks,
?assertEqual(N * 2, queue:len(Backlog) + queue:len(Inflight)),
%% resume the connection
sys:resume(Conn),
%% wait for 2*N acks
Expand Down
67 changes: 66 additions & 1 deletion test/wolff_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,69 @@ zero_ack_test() ->
ets:delete(CntrEventsTable),
deinstall_event_logging(?FUNCTION_NAME).

replayq_overflow_while_inflight_test() ->
ClientCfg = client_config(),
{ok, Client} = start_client(<<"client-1">>, ?HOSTS, ClientCfg),
Msg = fun(Value) -> #{key => <<>>, value => Value} end,
Batch = fun(Value) -> [Msg(Value), Msg(Value)] end,
BatchSize = wolff_producer:batch_bytes(Batch(<<"testdata1">>)),
ProducerCfg = #{max_batch_bytes => 1, %% ensure send one call at a time
replayq_max_total_bytes => BatchSize,
required_acks => all_isr,
max_send_ahead => 0 %% ensure one sent to kafka at a time
},
{ok, Producers} = wolff:start_producers(Client, <<"test-topic">>, ProducerCfg),
Pid = wolff_producers:lookup_producer(Producers, 0),
?assert(is_process_alive(Pid)),
TesterPid = self(),
ok = meck:new(kpro, [no_history, no_link, passthrough]),
meck:expect(kpro, send,
fun(_Conn, Req) ->
Payload = iolist_to_binary(lists:last(tuple_to_list(Req))),
[_, <<N, _/binary>>] = binary:split(Payload, <<"testdata">>),
TesterPid ! {sent_to_kafka, <<"testdata", N>>},
ok
end),
AckFun1 = fun(_Partition, BaseOffset) -> TesterPid ! {ack_1, BaseOffset}, ok end,
AckFun2 = fun(_Partition, BaseOffset) -> TesterPid ! {ack_2, BaseOffset}, ok end,
AckFun3 = fun(_Partition, BaseOffset) -> TesterPid ! {ack_3, BaseOffset}, ok end,
SendF = fun(AckFun, Value) -> wolff:send(Producers, Batch(Value), AckFun) end,
%% send 3 batches first will be inflight, 2nd is overflow (kicked out by the 3rd)
proc_lib:spawn_link(fun() -> SendF(AckFun1, <<"testdata1">>) end),
proc_lib:spawn_link(fun() -> timer:sleep(5), SendF(AckFun2, <<"testdata2">>) end),
proc_lib:spawn_link(fun() -> timer:sleep(10), SendF(AckFun3, <<"testdata3">>) end),
try
%% the 1st batch is sent to Kafka, but no reply, so no ack
receive
{sent_to_kafka, <<"testdata1">>} -> ok;
{ack_1, _} -> error(unexpected)
after
2000 ->
error(timeout)
end,
%% the 2nd batch should be dropped (pushed out by the 3rd)
receive
{ack_2, buffer_overflow_discarded} -> ok;
{sent_to_kafka, <<"testdata2">>} -> error(unexpected);
{sent_to_kafka, Offset2} -> error({unexpected, Offset2})
after
2000 ->
io:format(user, "~p~n", [sys:get_state(Pid)]),
error(timeout)
end,
%% the 3rd batch should stay in the queue
receive
{ack_3, Any} -> error({unexpected, Any})
after
100 ->
ok
end
after
meck:unload(kpro),
ok = wolff:stop_producers(Producers),
ok = stop_client(Client)
end.

replayq_overflow_test() ->
CntrEventsTable = ets:new(cntr_events, [public]),
install_event_logging(?FUNCTION_NAME, CntrEventsTable, false),
Expand Down Expand Up @@ -928,7 +991,9 @@ install_event_logging(TestCaseName, EventRecordTable, LogEvents) ->
fun wolff_tests:handle_telemetry_event/4,
#{record_table => EventRecordTable,
log_events => LogEvents}
).
),
timer:sleep(100),
ok.

telemetry_events() ->
[
Expand Down
Loading