From 17909597a421e1fe517702221733d5b690dfbc36 Mon Sep 17 00:00:00 2001 From: zmstone Date: Wed, 9 Oct 2024 16:05:41 +0200 Subject: [PATCH 1/5] refactor: call now_ts --- src/wolff_producer.erl | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/wolff_producer.erl b/src/wolff_producer.erl index 7a5f02b..a3fa631 100644 --- a/src/wolff_producer.erl +++ b/src/wolff_producer.erl @@ -534,7 +534,8 @@ is_send_ahead_allowed(#{config := #{max_send_ahead := Max}, is_idle(#{replayq := Q, sent_reqs_count := SentReqsCount}) -> SentReqsCount =:= 0 andalso replayq:count(Q) =:= 0. -now_ts() ->erlang:system_time(millisecond). +now_ts() -> + erlang:system_time(millisecond). make_queue_item(CallId, Batch) -> ?Q_ITEM(CallId, now_ts(), Batch). @@ -838,10 +839,9 @@ collect_send_calls(Call, Bytes, ?EMPTY, Max) -> Init = #{ts => now_ts(), bytes => 0, batch_r => []}, collect_send_calls(Call, Bytes, Init, Max); collect_send_calls(Call, Bytes, Calls, Max) -> - #{ts := Ts, bytes := Bytes0, batch_r := BatchR} = Calls, + #{bytes := Bytes0, batch_r := BatchR} = Calls, Sum = Bytes0 + Bytes, - R = Calls#{ts => Ts, - bytes => Sum, + R = Calls#{bytes => Sum, batch_r => [Call | BatchR] }, case Sum < Max of @@ -981,7 +981,7 @@ maybe_log_discard(St, Increment) -> maybe_log_discard(#{topic := Topic, partition := Partition}, Increment, #{last_ts := LastTs, last_cnt := LastCnt, acc_cnt := AccCnt}) -> - NowTs = erlang:system_time(millisecond), + NowTs = now_ts(), NewAccCnt = AccCnt + Increment, DiffCnt = NewAccCnt - LastCnt, case NowTs - LastTs > ?MIN_DISCARD_LOG_INTERVAL of @@ -1080,7 +1080,7 @@ maybe_log_discard_test_() -> [ {"no-increment", fun() -> maybe_log_discard(undefined, 0) end} , {"fake-last-old", fun() -> - Ts0 = erlang:system_time(millisecond) - ?MIN_DISCARD_LOG_INTERVAL - 1, + Ts0 = now_ts() - ?MIN_DISCARD_LOG_INTERVAL - 1, ok = put_overflow_log_state(Ts0, 2, 2), ok = maybe_log_discard(#{topic => <<"a">>, partition => 0}, 1), St = get_overflow_log_state(), @@ -1089,7 +1089,7 @@ maybe_log_discard_test_() -> end} , {"fake-last-fresh", fun() -> - Ts0 = erlang:system_time(millisecond), + Ts0 = now_ts(), ok = put_overflow_log_state(Ts0, 2, 2), ok = maybe_log_discard(#{topic => <<"a">>, partition => 0}, 2), St = get_overflow_log_state(), From b84c8a42294ad2dc504024fa84fe273f3e45ced0 Mon Sep 17 00:00:00 2001 From: zmstone Date: Wed, 9 Oct 2024 18:57:59 +0200 Subject: [PATCH 2/5] fix: handle dropped calls a bug introduced in 4.0.0: when replayq overflow has to drop calls while there are inflight requests, the producer may crash on the defensive assertion. --- src/wolff_pendack.erl | 76 ++++++++++++++++++++--------- src/wolff_producer.erl | 16 ++++-- test/wolff_dynamic_topics_tests.erl | 4 +- test/wolff_tests.erl | 67 ++++++++++++++++++++++++- 4 files changed, 133 insertions(+), 30 deletions(-) diff --git a/src/wolff_pendack.erl b/src/wolff_pendack.erl index f5867a3..8b2b493 100644 --- a/src/wolff_pendack.erl +++ b/src/wolff_pendack.erl @@ -21,7 +21,8 @@ %% time in microseconds. -module(wolff_pendack). --export([new/0, count/1, insert/2, take/2]). +-export([new/0, count/1, insert_backlog/2]). +-export([take_inflight/2, drop_backlog/2, move_backlog_to_inflight/2]). -export_type([acks/0]). @@ -29,7 +30,10 @@ -type key() :: call_id() | {call_id(), call_id()}. -type cb() :: term(). -opaque acks() :: #{next_id := integer(), - cbs := queue:queue({key(), cb()})}. + backlog := queue:queue({key(), cb()}), + inflight := queue:queue({key(), cb()}), + count := non_neg_integer() + }. %% @doc Initialize a new data structure. new() -> @@ -38,26 +42,21 @@ new() -> %% the process crashed or node restarted. Now = erlang:system_time(microsecond), #{next_id => Now, - cbs => queue:new() + backlog => queue:new(), + inflight => queue:new(), + count => 0 }. %% @doc count the total number of pending acks. -spec count(acks()) -> non_neg_integer(). -count(#{cbs := Cbs}) -> - sum(queue:to_list(Cbs), 0). - -sum([], Acc) -> - Acc; -sum([{CallId, _} | Rest], Acc) when is_integer(CallId) -> - sum(Rest, Acc + 1); -sum([{{MinCallId, MaxCallId}, _} | Rest], Acc) -> - sum(Rest, Acc + MaxCallId - MinCallId + 1). - -%% @doc insert a callback into the data structure. --spec insert(acks(), cb()) -> {call_id(), acks()}. -insert(#{next_id := Id, cbs := Cbs} = X, Cb) -> +count(#{count := Count}) -> + Count. + +%% @doc insert a callback into the backlog queue. +-spec insert_backlog(acks(), cb()) -> {call_id(), acks()}. +insert_backlog(#{next_id := Id, backlog := Cbs, count := Count} = X, Cb) -> NewCbs = insert_cb(Cbs, Id, Cb), - {Id, X#{next_id => Id + 1, cbs => NewCbs}}. + {Id, X#{next_id => Id + 1, backlog => NewCbs, count => Count + 1}}. insert_cb(Cbs, Id, Cb) -> case queue:out_r(Cbs) of @@ -84,17 +83,17 @@ expand_id({MinId, MaxId}, Id) -> Id =:= MaxId + 1 orelse error({unexpected_id, {MinId, MaxId}, Id}), {MinId, Id}. -%% @doc Take the callback for a given call ID. -%% The ID is expected to be the oldest in the queue. +%% @doc Take the callback from the inflight queue. +%% The ID is expected to be the oldest in the inflight queue. %% Return the callback and the updated data structure. --spec take(acks(), call_id()) -> {ok, cb(), acks()} | false. -take(#{cbs := Cbs} = X, Id) -> +-spec take_inflight(acks(), call_id()) -> {ok, cb(), acks()} | false. +take_inflight(#{inflight := Cbs, count := Count} = X, Id) -> case take1(Cbs, Id) of false -> %% stale ack false; {ok, Cb, Cbs1} -> - {ok, Cb, X#{cbs => Cbs1}} + {ok, Cb, X#{inflight => Cbs1, count => Count - 1}} end. take1(Cbs0, Id) -> @@ -120,3 +119,36 @@ take2(Cbs, {MinId, MaxId}, Cb, Id) when Id =:= MinId -> end; take2(_Cbs, {MinId, MaxId}, _Cb, Id) -> error(#{cause => unexpected_id, min => MinId, max => MaxId, got => Id}). + +%% @doc Drop calls from the head (older end) of the backlog queue. +%% Return list of callbacks and the remaining. +-spec drop_backlog(acks(), [call_id()]) -> {[cb()], acks()}. +drop_backlog(X, Ids) -> + drop1(X, Ids, []). + +drop1(X, [], Acc) -> + {lists:reverse(Acc), X}; +drop1(#{backlog := Cbs0, count := Count} = X, [Id | Ids], Acc) -> + case take1(Cbs0, Id) of + false -> + drop1(X, Ids, Acc); + {ok, Cb, Cbs} -> + drop1(X#{backlog => Cbs, count => Count - 1}, Ids, [Cb | Acc]) + end. + +%% @doc Move a list of calls from the head of the backlog queue +%% and push it to the inflight queue. +-spec move_backlog_to_inflight(acks(), [call_id()]) -> acks(). +move_backlog_to_inflight(X, []) -> + X; +move_backlog_to_inflight(X, [Id | Ids]) -> + move_backlog_to_inflight(move1(X, Id), Ids). + +move1(#{backlog := Backlog0, inflight := Inflight0} = X, Id) -> + case take1(Backlog0, Id) of + false -> + X; + {ok, Cb, Backlog} -> + Inflight = insert_cb(Inflight0, Id, Cb), + X#{backlog => Backlog, inflight => Inflight} + end. diff --git a/src/wolff_producer.erl b/src/wolff_producer.erl index a3fa631..adaaedd 100644 --- a/src/wolff_producer.erl +++ b/src/wolff_producer.erl @@ -497,10 +497,13 @@ send_to_kafka(#{sent_reqs := SentReqs, inflight_calls := InflightCalls, replayq := Q, config := #{max_batch_bytes := BytesLimit} = Config, - conn := Conn + conn := Conn, + pending_acks := PendingAcks } = St0) -> {NewQ, QAckRef, Items} = replayq:pop(Q, #{bytes_limit => BytesLimit, count_limit => 999999999}), + IDs = lists:map(fun({ID, _}) -> ID end, get_calls_from_queue_items(Items)), + NewPendingAcks = wolff_pendack:move_backlog_to_inflight(PendingAcks, IDs), wolff_metrics:queuing_set(Config, replayq:count(NewQ)), NewSentReqsCount = SentReqsCount + 1, NrOfCalls = count_calls(Items), @@ -515,7 +518,8 @@ send_to_kafka(#{sent_reqs := SentReqs, }, St2 = St1#{sent_reqs := queue:in(Sent, SentReqs), sent_reqs_count := NewSentReqsCount, - inflight_calls := NewInflightCalls + inflight_calls := NewInflightCalls, + pending_acks := NewPendingAcks }, ok = request_async(Conn, Req), St3 = maybe_fake_kafka_ack(NoAck, Sent, St2), @@ -756,7 +760,7 @@ ensure_delayed_reconnect(St, _Delay) -> evaluate_pending_ack_funs(PendingAcks, [], _BaseOffset) -> PendingAcks; evaluate_pending_ack_funs(PendingAcks, [{CallId, BatchSize} | Rest], BaseOffset) -> NewPendingAcks = - case wolff_pendack:take(PendingAcks, CallId) of + case wolff_pendack:take_inflight(PendingAcks, CallId) of {ok, AckCb, PendingAcks1} -> ok = eval_ack_cb(AckCb, BaseOffset), PendingAcks1; @@ -913,7 +917,7 @@ enqueue_calls2(Calls, %% keep callback funs in memory, do not seralize it into queue because %% saving anonymous function to disk may easily lead to badfun exception %% in case of restart on newer version beam. - {CallId, PendingAcksOut} = wolff_pendack:insert(PendingAcksIn, ?ACK_CB(AckFun, Partition)), + {CallId, PendingAcksOut} = wolff_pendack:insert_backlog(PendingAcksIn, ?ACK_CB(AckFun, Partition)), NewItems = [make_queue_item(CallId, Batch) | Items], {NewItems, PendingAcksOut, Size + batch_bytes(Batch)} end, {[], PendingAcks0, 0}, Calls), @@ -959,12 +963,14 @@ handle_overflow(#{replayq := Q, replayq:pop(Q, #{bytes_limit => Overflow, count_limit => 999999999}), ok = replayq:ack(NewQ, QAckRef), Calls = get_calls_from_queue_items(Items), + CallIDs = lists:map(fun({CallId, _BatchSize}) -> CallId end, Calls), NrOfCalls = count_calls(Items), wolff_metrics:dropped_queue_full_inc(Config, NrOfCalls), wolff_metrics:dropped_inc(Config, NrOfCalls), wolff_metrics:queuing_set(Config, replayq:count(NewQ)), ok = maybe_log_discard(St, NrOfCalls), - NewPendingAcks = evaluate_pending_ack_funs(PendingAcks, Calls, ?buffer_overflow_discarded), + {CbList, NewPendingAcks} = wolff_pendack:drop_backlog(PendingAcks, CallIDs), + lists:foreach(fun(Cb) -> eval_ack_cb(Cb, ?buffer_overflow_discarded) end, CbList), St#{replayq := NewQ, pending_acks := NewPendingAcks}. %% use process dictionary for upgrade without restart diff --git a/test/wolff_dynamic_topics_tests.erl b/test/wolff_dynamic_topics_tests.erl index 4fcdb1c..f07fb9c 100644 --- a/test/wolff_dynamic_topics_tests.erl +++ b/test/wolff_dynamic_topics_tests.erl @@ -96,8 +96,8 @@ ack_cb_interlave_test() -> %% inspect the pending acks #{conn := Conn, pending_acks := Acks} = sys:get_state(Pid), ?assertEqual(N * 2, wolff_pendack:count(Acks)), - #{cbs := Cbs} = Acks, - ?assertEqual(N * 2, queue:len(Cbs)), + #{inflight := Inflight, backlog := Backlog} = Acks, + ?assertEqual(N * 2, queue:len(Backlog) + queue:len(Inflight)), %% resume the connection sys:resume(Conn), %% wait for 2*N acks diff --git a/test/wolff_tests.erl b/test/wolff_tests.erl index 9884f95..e0c4892 100644 --- a/test/wolff_tests.erl +++ b/test/wolff_tests.erl @@ -316,6 +316,69 @@ zero_ack_test() -> ets:delete(CntrEventsTable), deinstall_event_logging(?FUNCTION_NAME). +replayq_overflow_while_inflight_test() -> + ClientCfg = client_config(), + {ok, Client} = start_client(<<"client-1">>, ?HOSTS, ClientCfg), + Msg = fun(Value) -> #{key => <<>>, value => Value} end, + Batch = fun(Value) -> [Msg(Value), Msg(Value)] end, + BatchSize = wolff_producer:batch_bytes(Batch(<<"testdata1">>)), + ProducerCfg = #{max_batch_bytes => 1, %% ensure send one call at a time + replayq_max_total_bytes => BatchSize, + required_acks => all_isr, + max_send_ahead => 0 %% ensure one sent to kafka at a time + }, + {ok, Producers} = wolff:start_producers(Client, <<"test-topic">>, ProducerCfg), + Pid = wolff_producers:lookup_producer(Producers, 0), + ?assert(is_process_alive(Pid)), + TesterPid = self(), + ok = meck:new(kpro, [non_strict, no_history, no_link, passthrough]), + meck:expect(kpro, send, + fun(_Conn, Req) -> + Payload = iolist_to_binary(lists:last(tuple_to_list(Req))), + [_, <>] = binary:split(Payload, <<"testdata">>), + TesterPid ! {sent_to_kafka, <<"testdata", N>>}, + ok + end), + AckFun1 = fun(_Partition, BaseOffset) -> TesterPid ! {ack_1, BaseOffset}, ok end, + AckFun2 = fun(_Partition, BaseOffset) -> TesterPid ! {ack_2, BaseOffset}, ok end, + AckFun3 = fun(_Partition, BaseOffset) -> TesterPid ! {ack_3, BaseOffset}, ok end, + SendF = fun(AckFun, Value) -> wolff:send(Producers, Batch(Value), AckFun) end, + %% send 3 batches first will be inflight, 2nd is overflow (kicked out by the 3rd) + proc_lib:spawn_link(fun() -> SendF(AckFun1, <<"testdata1">>) end), + proc_lib:spawn_link(fun() -> timer:sleep(5), SendF(AckFun2, <<"testdata2">>) end), + proc_lib:spawn_link(fun() -> timer:sleep(10), SendF(AckFun3, <<"testdata3">>) end), + try + %% the 1st batch is sent to Kafka, but no reply, so no ack + receive + {sent_to_kafka, <<"testdata1">>} -> ok; + {ack_1, _} -> error(unexpected) + after + 2000 -> + error(timeout) + end, + %% the 2nd batch should be dropped (pushed out by the 3rd) + receive + {ack_2, buffer_overflow_discarded} -> ok; + {sent_to_kafka, <<"testdata2">>} -> error(unexpected); + {sent_to_kafka, Offset2} -> error({unexpected, Offset2}) + after + 2000 -> + io:format(user, "~p~n", [sys:get_state(Pid)]), + error(timeout) + end, + %% the 3rd batch should stay in the queue + receive + {ack_3, Any} -> error({unexpected, Any}) + after + 100 -> + ok + end + after + meck:unload(kpro), + ok = wolff:stop_producers(Producers), + ok = stop_client(Client) + end. + replayq_overflow_test() -> CntrEventsTable = ets:new(cntr_events, [public]), install_event_logging(?FUNCTION_NAME, CntrEventsTable, false), @@ -928,7 +991,9 @@ install_event_logging(TestCaseName, EventRecordTable, LogEvents) -> fun wolff_tests:handle_telemetry_event/4, #{record_table => EventRecordTable, log_events => LogEvents} - ). + ), + timer:sleep(100), + ok. telemetry_events() -> [ From 86603bce8963ddb381b579f6e46ab9428b965e65 Mon Sep 17 00:00:00 2001 From: zmstone Date: Wed, 9 Oct 2024 19:12:26 +0200 Subject: [PATCH 3/5] docs: update changelog --- changelog.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/changelog.md b/changelog.md index 6994872..8883ac3 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,6 @@ * 4.0.2 - - Fix dynamic topic producer initialization failure handling. + - Fix dynamic topic producer initialization failure handling (introduced in 3.0.0). + - Fix `unexpected_id` crash when replayq overflow. * 4.0.1 - Minimize callback context for sync call. From 9347ec14feeded3fd3b7686f0181065031d87c5b Mon Sep 17 00:00:00 2001 From: zmstone Date: Wed, 9 Oct 2024 19:31:20 +0200 Subject: [PATCH 4/5] docs: update changelog --- changelog.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog.md b/changelog.md index 8883ac3..5c99f92 100644 --- a/changelog.md +++ b/changelog.md @@ -1,6 +1,6 @@ * 4.0.2 - Fix dynamic topic producer initialization failure handling (introduced in 3.0.0). - - Fix `unexpected_id` crash when replayq overflow. + - Fix `unexpected_id` crash when replayq overflow (introduced in 4.0.1). * 4.0.1 - Minimize callback context for sync call. From a67da3b0d127c1a5db7f880e832122d26bc4c49c Mon Sep 17 00:00:00 2001 From: zmstone Date: Wed, 9 Oct 2024 19:49:22 +0200 Subject: [PATCH 5/5] chore: address review comments --- src/wolff_pendack.erl | 4 ++-- test/wolff_tests.erl | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/wolff_pendack.erl b/src/wolff_pendack.erl index 8b2b493..9e16c6a 100644 --- a/src/wolff_pendack.erl +++ b/src/wolff_pendack.erl @@ -133,7 +133,7 @@ drop1(#{backlog := Cbs0, count := Count} = X, [Id | Ids], Acc) -> false -> drop1(X, Ids, Acc); {ok, Cb, Cbs} -> - drop1(X#{backlog => Cbs, count => Count - 1}, Ids, [Cb | Acc]) + drop1(X#{backlog := Cbs, count := Count - 1}, Ids, [Cb | Acc]) end. %% @doc Move a list of calls from the head of the backlog queue @@ -150,5 +150,5 @@ move1(#{backlog := Backlog0, inflight := Inflight0} = X, Id) -> X; {ok, Cb, Backlog} -> Inflight = insert_cb(Inflight0, Id, Cb), - X#{backlog => Backlog, inflight => Inflight} + X#{backlog := Backlog, inflight := Inflight} end. diff --git a/test/wolff_tests.erl b/test/wolff_tests.erl index e0c4892..e65e538 100644 --- a/test/wolff_tests.erl +++ b/test/wolff_tests.erl @@ -331,7 +331,7 @@ replayq_overflow_while_inflight_test() -> Pid = wolff_producers:lookup_producer(Producers, 0), ?assert(is_process_alive(Pid)), TesterPid = self(), - ok = meck:new(kpro, [non_strict, no_history, no_link, passthrough]), + ok = meck:new(kpro, [no_history, no_link, passthrough]), meck:expect(kpro, send, fun(_Conn, Req) -> Payload = iolist_to_binary(lists:last(tuple_to_list(Req))),