From bcfceda4c107a0a3e427c13341acd9141556b49e Mon Sep 17 00:00:00 2001 From: zmstone Date: Fri, 6 Sep 2024 22:11:01 +0200 Subject: [PATCH] feat: optimize disk buffer writes Previously, the wolff_producer implementation is aggressive when try to enqueue newly received calls, even when max_linger_ms is set to non-zero because the linger was implemented from the popping end of the queue. This change moves the linger timer to the pushing end of the queue, that is, the process will delay enqueue to allow a larger collection of concurrent calls so the write batch towards disk can be larger --- README.md | 2 + changelog.md | 3 + src/wolff.app.src | 2 +- src/wolff_producer.erl | 143 +++++++++++++++++++++++++++------------- src/wolff_producers.erl | 2 +- test/wolff_tests.erl | 23 ++++--- 6 files changed, 119 insertions(+), 56 deletions(-) diff --git a/README.md b/README.md index 54698b9..a783b56 100644 --- a/README.md +++ b/README.md @@ -161,6 +161,8 @@ wolff:send(Producers, [Msg], AckFun). * `max_linger_ms`: Age in milliseconds a batch can stay in queue when the connection is idle (as in no pending acks from kafka). Default=0 (as in send immediately). +* `max_linger_bytes`: Number of bytes to collect before sending it to Kafka. If set to 0, `max_batch_bytes` is taken for mem-only mode, otherwise it's 10 times `max_batch_bytes` (but never exceeds 10MB) to optimize disk write. + * `max_send_ahead`: Number of batches to be sent ahead without receiving ack for the last request. Must be 0 if messages must be delivered in strict order. diff --git a/changelog.md b/changelog.md index 71d5f8e..593256c 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,8 @@ * 4.0.0 - Delete global stats (deprecated since 1.9). + - Move linger delay to front of the buffer queue. + The default value for `max_linger_ms` is `0` as before. + Setting `max_linger_ms=10` will make the disk write batch larger when buffer is configured to disk mode or disk-offload mode. * 3.0.4 - Upgrade to kafka_protocol-4.1.8 diff --git a/src/wolff.app.src b/src/wolff.app.src index efb0a11..6d487a1 100644 --- a/src/wolff.app.src +++ b/src/wolff.app.src @@ -1,6 +1,6 @@ {application, wolff, [{description, "Kafka's publisher"}, - {vsn, "4.0.0"}, + {vsn, "git"}, {registered, []}, {applications, [kernel, diff --git a/src/wolff_producer.erl b/src/wolff_producer.erl index e3bb5f3..a6c616f 100644 --- a/src/wolff_producer.erl +++ b/src/wolff_producer.erl @@ -45,6 +45,7 @@ ack_timeout | max_batch_bytes | max_linger_ms | + max_linger_bytes | max_send_ahead | compression | drop_if_highmem | @@ -60,6 +61,7 @@ ack_timeout => timeout(), max_batch_bytes => pos_integer(), max_linger_ms => non_neg_integer(), + max_linger_bytes => non_neg_integer(), max_send_ahead => non_neg_integer(), compression => kpro:compress_option(), drop_if_highmem => boolean(), @@ -75,6 +77,7 @@ ack_timeout => timeout(), max_batch_bytes => pos_integer(), max_linger_ms => non_neg_integer(), + max_linger_bytes => non_neg_integer(), max_send_ahead => non_neg_integer(), compression => kpro:compress_option(), drop_if_highmem => boolean(), @@ -95,7 +98,9 @@ -define(ACK_CB(AckCb, Partition), {AckCb, Partition}). -define(no_queue_ack, no_queue_ack). -define(no_caller_ack, no_caller_ack). - +-define(MAX_LINGER_BYTES, (10 bsl 20)). +-type ack_fun() :: wolff:ack_fun(). +-type send_req() :: ?SEND_REQ(pid() | reference(), [wolff:msg()], ack_fun()). -type sent() :: #{req_ref := reference(), q_items := [?Q_ITEM(_CallId, _Ts, _Batch)], q_ack_ref := replayq:ack_ref(), @@ -106,7 +111,7 @@ , client_id := wolff:client_id() , config := config_state() , conn := undefined | _ - , ?linger_expire_timer := false | timer:tref() + , ?linger_expire_timer := false | reference() , partition := partition() , pending_acks := #{} % CallId => AckCb , produce_api_vsn := undefined | _ @@ -115,6 +120,7 @@ , sent_reqs_count := non_neg_integer() , inflight_calls := non_neg_integer() , topic := topic() + , calls := empty | #{ts := pos_integer(), bytes := pos_integer(), batch_r := [send_req()]} }. %% @doc Start a per-partition producer worker. @@ -130,6 +136,9 @@ %% exact max allowed message size configured in kafka. %% * `max_linger_ms': Age in milliseconds a baatch can stay in queue when the connection %% is idle (as in no pending acks). Default: 0 (as in send immediately) +%% * `max_linger_bytes': Number of bytes to collect before sending it to Kafka. +%% If set to 0, `max_batch_bytes' is taken for mem-only mode, otherwise it's 10 times +%% `max_batch_bytes' (but never exceeds 10MB) to optimize disk write. %% * `max_send_ahead': Number of batches to be sent ahead without receiving ack for %% the last request. Must be 0 if messages must be delivered in strict order. %% * `compression': `no_compression', `snappy' or `gzip'. @@ -251,7 +260,8 @@ do_init(#{client_id := ClientId, fun(Meta) -> Meta#{partition_id => Partition} end, #{partition_id => Partition}, Config0), - Config = maps:without([replayq_dir, replayq_seg_bytes], Config1), + Config2 = resolve_max_linger_bytes(Config1, Q), + Config = maps:without([replayq_dir, replayq_seg_bytes], Config2), wolff_metrics:queuing_set(Config, replayq:count(Q)), wolff_metrics:inflight_set(Config, 0), St#{replayq => Q, @@ -263,7 +273,8 @@ do_init(#{client_id := ClientId, sent_reqs_count => 0, inflight_calls => 0, conn := undefined, - client_id => ClientId + client_id => ClientId, + calls => empty }. handle_call(stop, From, St) -> @@ -275,11 +286,14 @@ handle_call(_Call, _From, St) -> handle_info({do_init, St0}, _) -> St = do_init(St0), {noreply, St}; -handle_info(?linger_expire, St) -> - {noreply, maybe_send_to_kafka(St#{?linger_expire_timer := false})}; -handle_info(?SEND_REQ(_, Batch, _) = Call, #{config := #{max_batch_bytes := Limit}} = St0) -> - {Calls, _CollectedBytes} = collect_send_calls([Call], batch_bytes(Batch), Limit), - St1 = enqueue_calls(Calls, St0), +handle_info(?linger_expire, St0) -> + St1 = enqueue_calls(St0#{?linger_expire_timer => false}, no_linger), + St = maybe_send_to_kafka(St1), + {noreply, St}; +handle_info(?SEND_REQ(_, Batch, _) = Call, #{calls := Calls0} = St0) -> + Bytes = batch_bytes(Batch), + Calls = collect_send_calls(Call, Bytes, Calls0), + St1 = enqueue_calls(St0#{calls => Calls}, maybe_linger), St = maybe_send_to_kafka(St1), {noreply, St}; handle_info({msg, Conn, Rsp}, #{conn := Conn} = St0) -> @@ -366,11 +380,26 @@ ensure_ts(Batch) -> make_call_id(Base) -> Base + erlang:unique_integer([positive]). +resolve_max_linger_bytes(#{max_linger_bytes := 0, + max_batch_bytes := Mb + } = Config, Q) -> + case replayq:is_mem_only(Q) of + true -> + Config#{max_linger_bytes => Mb}; + false -> + %% when disk or offload mode, try to linger with more bytes + %% to optimize disk write performance + Config#{max_linger_bytes => min(?MAX_LINGER_BYTES, Mb * 10)} + end; +resolve_max_linger_bytes(Config, _Q) -> + Config. + use_defaults(Config) -> use_defaults(Config, [{required_acks, all_isr}, {ack_timeout, 10000}, {max_batch_bytes, ?WOLFF_KAFKA_DEFAULT_MAX_MESSAGE_BYTES}, {max_linger_ms, 0}, + {max_linger_bytes, 0}, {max_send_ahead, 0}, {compression, no_compression}, {reconnect_delay_ms, 2000} @@ -451,38 +480,17 @@ maybe_send_to_kafka(#{conn := Conn, replayq := Q} = St) -> maybe_send_to_kafka_has_pending(St) -> case is_send_ahead_allowed(St) of - true -> maybe_send_to_kafka_now(St); + true -> send_to_kafka(St); false -> St %% reached max inflight limit end. -maybe_send_to_kafka_now(#{?linger_expire_timer := LTimer, - replayq := Q, - config := #{max_linger_ms := Max}} = St) -> - First = replayq:peek(Q), - LingerTimeout = Max - (now_ts() - get_item_ts(First)), - case LingerTimeout =< 0 of - true -> - %% the oldest item is too old, send now - send_to_kafka(St); %% send now - false when is_reference(LTimer) -> - %% timer already started - St; - false -> - %% delay send, try to accumulate more into the batch - Ref = erlang:send_after(LingerTimeout, self(), ?linger_expire), - St#{?linger_expire_timer := Ref} - end. - send_to_kafka(#{sent_reqs := SentReqs, sent_reqs_count := SentReqsCount, inflight_calls := InflightCalls, replayq := Q, config := #{max_batch_bytes := BytesLimit} = Config, - conn := Conn, - ?linger_expire_timer := LTimer + conn := Conn } = St0) -> - %% timer might have already expired, but should do no harm - is_reference(LTimer) andalso erlang:cancel_timer(LTimer), {NewQ, QAckRef, Items} = replayq:pop(Q, #{bytes_limit => BytesLimit, count_limit => 999999999}), wolff_metrics:queuing_set(Config, replayq:count(NewQ)), @@ -491,7 +499,7 @@ send_to_kafka(#{sent_reqs := SentReqs, NewInflightCalls = InflightCalls + NrOfCalls, _ = wolff_metrics:inflight_set(Config, NewInflightCalls), #kpro_req{ref = Ref, no_ack = NoAck} = Req = make_request(Items, St0), - St1 = St0#{replayq := NewQ, ?linger_expire_timer := false}, + St1 = St0#{replayq := NewQ}, Sent = #{req_ref => Ref, q_items => Items, q_ack_ref => QAckRef, @@ -534,8 +542,6 @@ queue_item_marshaller(?Q_ITEM(_, _, _) = I) -> queue_item_marshaller(Bin) when is_binary(Bin) -> binary_to_term(Bin). -get_item_ts(?Q_ITEM(_, Ts, _)) -> Ts. - get_produce_version(#{conn := Conn} = St) when is_pid(Conn) -> Vsn = case kpro:get_api_vsn_range(Conn, produce) of {ok, {_Min, Max}} -> Max; @@ -825,24 +831,69 @@ zz(I) -> (I bsl 1) bxor (I bsr 63). request_async(Conn, Req) when is_pid(Conn) -> ok = kpro:send(Conn, Req). +collect_send_calls(Call, Bytes, empty) -> + Init = #{ts => now_ts(), bytes => 0, batch_r => []}, + collect_send_calls(Call, Bytes, Init); +collect_send_calls(Call, Bytes, #{ts := Ts, bytes := Bytes0, batch_r := BatchR}) -> + collect_send_calls2(#{ts => Ts, bytes => Bytes0 + Bytes, batch_r => [Call | BatchR]}). + %% Collect all send requests which are already in process mailbox -collect_send_calls(Calls, Size, Limit) when Size >= Limit -> - {lists:reverse(Calls), Size}; -collect_send_calls(Calls, Size, Limit) -> +collect_send_calls2(Calls) -> receive ?SEND_REQ(_, Batch, _) = Call -> - collect_send_calls([Call | Calls], Size + batch_bytes(Batch), Limit) + Bytes = batch_bytes(Batch), + collect_send_calls(Call, Bytes, Calls) after 0 -> - {lists:reverse(Calls), Size} + Calls end. -enqueue_calls(Calls, #{replayq := Q, - pending_acks := PendingAcks0, - call_id_base := CallIdBase, - partition := Partition, - config := Config0 - } = St0) -> +ensure_linger_expire_timer_start(#{?linger_expire_timer := false} = St, Timeout) -> + %% delay enqueue, try to accumulate more into the batch + Ref = erlang:send_after(Timeout, self(), ?linger_expire), + St#{?linger_expire_timer := Ref}; +ensure_linger_expire_timer_start(St, _Timeout) -> + %% timer is already started + St. + +ensure_linger_expire_timer_cancel(#{?linger_expire_timer := LTimer} = St) -> + _ = is_reference(LTimer) andalso erlang:cancel_timer(LTimer), + St#{?linger_expire_timer => false}. + +%% check if the call collection should continue to linger before enqueue +is_linger_continue(#{calls := Calls, config := Config}) -> + #{max_linger_ms := MaxLingerMs, max_linger_bytes := MaxLingerBytes} = Config, + #{ts := Ts, bytes := Bytes} = Calls, + TimeLeft = MaxLingerMs - (now_ts() - Ts), + case TimeLeft =< 0 of + true -> + false; + false -> + (Bytes < MaxLingerBytes) andalso {true, TimeLeft} + end. + +enqueue_calls(#{calls := empty} = St, _) -> + %% no call to enqueue + St; +enqueue_calls(St, maybe_linger) -> + case is_linger_continue(St) of + {true, Timeout} -> + ensure_linger_expire_timer_start(St, Timeout); + false -> + enqueue_calls(St, no_linger) + end; +enqueue_calls(#{calls := #{batch_r := CallsR}} = St0, no_linger) -> + Calls = lists:reverse(CallsR), + St = ensure_linger_expire_timer_cancel(St0), + enqueue_calls2(Calls, St#{calls => empty}). + +enqueue_calls2(Calls, + #{replayq := Q, + pending_acks := PendingAcks0, + call_id_base := CallIdBase, + partition := Partition, + config := Config0 + } = St0) -> {QueueItems, PendingAcks, CallByteSize} = lists:foldl( fun(?SEND_REQ(_From, Batch, AckFun), {Items, PendingAcksIn, Size}) -> diff --git a/src/wolff_producers.erl b/src/wolff_producers.erl index 068d8b2..0c1edd0 100644 --- a/src/wolff_producers.erl +++ b/src/wolff_producers.erl @@ -330,7 +330,7 @@ pick_next_alive(LookupFn, Partition, Count) -> pick_next_alive(LookupFn, (Partition + 1) rem Count, Count, _Tried = 1). pick_next_alive(_LookupFn, _Partition, Count, Count) -> - throw(#{cause => all_producers_down}); + throw(#{cause => all_producers_down, count => Count}); pick_next_alive(LookupFn, Partition, Count, Tried) -> Pid = LookupFn(Partition), case is_alive(Pid) of diff --git a/test/wolff_tests.erl b/test/wolff_tests.erl index d394a04..4ca1131 100644 --- a/test/wolff_tests.erl +++ b/test/wolff_tests.erl @@ -324,20 +324,25 @@ replayq_overflow_test() -> Msg = #{key => <<>>, value => <<"12345">>}, Batch = [Msg, Msg], BatchSize = wolff_producer:batch_bytes(Batch), - ProducerCfg = #{max_batch_bytes => 1, %% make sure not collecting calls into one batch + LingerMs = 100, + ProducerCfg = #{max_batch_bytes => 1, %% ensure send one call at a time replayq_max_total_bytes => BatchSize, required_acks => all_isr, - max_linger_ms => 1000 %% do not send to kafka immediately + max_linger_ms => LingerMs, %% delay enqueue + max_linger_bytes => BatchSize + 1 %% delay enqueue }, {ok, Producers} = wolff:start_producers(Client, <<"test-topic">>, ProducerCfg), + Pid = wolff_producers:lookup_producer(Producers, 0), + ?assert(is_process_alive(Pid)), TesterPid = self(), AckFun1 = fun(_Partition, BaseOffset) -> TesterPid ! {ack_1, BaseOffset}, ok end, AckFun2 = fun(_Partition, BaseOffset) -> TesterPid ! {ack_2, BaseOffset}, ok end, SendF = fun(AckFun) -> wolff:send(Producers, Batch, AckFun) end, %% send two batches to overflow one - spawn(fun() -> SendF(AckFun1) end), + proc_lib:spawn_link(fun() -> SendF(AckFun1) end), timer:sleep(1), %% ensure order - spawn(fun() -> SendF(AckFun2) end), + proc_lib:spawn_link(fun() -> SendF(AckFun2) end), + timer:sleep(LingerMs * 2), try %% pushed out of replayq due to overflow receive @@ -363,7 +368,7 @@ replayq_overflow_test() -> [1] = get_telemetry_seq(CntrEventsTable, [wolff, dropped_queue_full]), ?assert_eq_optional_tail( wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, queuing])), - [0,1,2,1,0]), + [0,2,1,0]), ?assert_eq_optional_tail( wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, inflight])), [0,1,0]), @@ -610,8 +615,10 @@ test_message_too_large() -> %% then it should retry sending one message at a time ProducerCfg = #{partitioner => fun(_, _) -> 0 end, max_batch_bytes => MaxMessageBytes * 3, - %% ensure batching - max_linger_ms => 100 + %% ensure batching by delay enqueue by 100 seconds + max_linger_ms => 100, + %% ensure linger is not expired by reaching size + max_linger_bytes => MaxMessageBytes * 100 }, {ok, Producers} = wolff:start_producers(Client, TopicBin, ProducerCfg), MaxBytesCompensateOverhead = MaxMessageBytes - ?BATCHING_OVERHEAD - 7, @@ -620,7 +627,7 @@ test_message_too_large() -> Ref = make_ref(), Self = self(), AckFun = {fun ?MODULE:ack_cb/4, [Self, Ref]}, - _ = wolff:send(Producers, Batch, AckFun), + spawn(fun() -> wolff:send(Producers, Batch, AckFun) end), fun() -> ?WAIT(5000, {ack, Ref, _Partition, BaseOffset}, BaseOffset) end end, %% Must be ok to send one message