Skip to content

Commit

Permalink
Merge pull request #79 from zmstone/0911-minimize-sync-send-ref
Browse files Browse the repository at this point in the history
refactor: minimize the reply context for sync-send
  • Loading branch information
zmstone authored Sep 14, 2024
2 parents 7bb150b + 1a058f8 commit 3c4f867
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 44 deletions.
2 changes: 1 addition & 1 deletion rebar.config
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{deps, [ {kafka_protocol, "4.1.8"}
{deps, [ {kafka_protocol, "4.1.9"}
, {replayq, "0.3.4"}
, {lc, "0.3.2"}
, {telemetry, "1.1.0"}
Expand Down
23 changes: 0 additions & 23 deletions rebar.lock

This file was deleted.

50 changes: 30 additions & 20 deletions src/wolff_producer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
-define(MIN_DISCARD_LOG_INTERVAL, 5000).

%% APIs
-export([start_link/5, stop/1, send/3, send/4, send_sync/3, ack_cb/4]).
-export([start_link/5, stop/1, send/3, send/4, send_sync/3]).

%% gen_server callbacks
-export([code_change/3, handle_call/3, handle_cast/2, handle_info/2, init/1, terminate/2]).
Expand Down Expand Up @@ -98,14 +98,22 @@
-define(ACK_CB(AckCb, Partition), {AckCb, Partition}).
-define(no_queue_ack, no_queue_ack).
-define(MAX_LINGER_BYTES, (10 bsl 20)).
-define(EMPTY, empty).
-define(SYNC_REF(Caller, Ref), {Caller, Ref}).
-define(IS_SYNC_REF(Caller, Ref), ((is_reference(Caller) orelse is_pid(Caller)) andalso is_reference(Ref))).

-type ack_fun() :: wolff:ack_fun().
-type send_req() :: ?SEND_REQ({pid(), reference()}, [wolff:msg()], ack_fun()).
-type sent() :: #{req_ref := reference(),
q_items := [?Q_ITEM(_CallId, _Ts, _Batch)],
q_ack_ref := replayq:ack_ref(),
attempts := pos_integer()
}.

-type sync_refs() :: {pid() | reference(), reference()}.
-type calls() :: #{ts := pos_integer(),
bytes := pos_integer(),
batch_r := [send_req()]
}.
-type state() :: #{ client_id := wolff:client_id()
, config := config_state()
, conn := undefined | _
Expand All @@ -118,7 +126,7 @@
, sent_reqs_count := non_neg_integer()
, inflight_calls := non_neg_integer()
, topic := topic()
, calls := empty | #{ts := pos_integer(), bytes := pos_integer(), batch_r := [send_req()]}
, calls := ?EMPTY | calls()
}.

%% @doc Start a per-partition producer worker.
Expand Down Expand Up @@ -171,7 +179,7 @@ send(Pid, Batch, AckFun) ->
%% NOTE: It's possible that two or more batches get included into one produce request.
%% But a batch is never split into produce requests.
%% Make sure it will not exceed the `max_batch_bytes' limit when sending a batch.
-spec send(pid(), [wolff:msg()], wolff:ack_fun(), WaitForQueued::wait_for_queued | no_wait_for_queued) -> ok.
-spec send(pid(), [wolff:msg()], wolff:ack_fun() | sync_refs(), WaitForQueued::wait_for_queued | no_wait_for_queued) -> ok.
send(Pid, [_ | _] = Batch0, AckFun, wait_for_queued) ->
Caller = self(),
Mref = erlang:monitor(process, Pid),
Expand All @@ -195,11 +203,11 @@ send(Pid, [_ | _] = Batch0, AckFun, no_wait_for_queued) ->
send_sync(Pid, Batch0, Timeout) ->
Caller = caller(),
Mref = erlang:monitor(process, Pid),
%% synced local usage, safe to use anonymous fun
AckFun = {fun ?MODULE:ack_cb/4, [Caller, Mref]},
ok = send(Pid, Batch0, AckFun, no_wait_for_queued),
Ack = ?SYNC_REF(Caller, Mref),
ok = send(Pid, Batch0, Ack, no_wait_for_queued),
receive
{Mref, Partition, BaseOffset} ->
%% sent from eval_ack_cb
erlang:demonitor(Mref, [flush]),
{Partition, BaseOffset};
{'DOWN', Mref, _, _, Reason} ->
Expand All @@ -217,11 +225,6 @@ send_sync(Pid, Batch0, Timeout) ->
end
end.

%% @hidden Callbak exported for send_sync/3.
ack_cb(Partition, BaseOffset, Caller, Mref) ->
_ = erlang:send(Caller, {Mref, Partition, BaseOffset}),
ok.

init(St) ->
erlang:process_flag(trap_exit, true),
%% ensure init/1 can never fail
Expand Down Expand Up @@ -282,7 +285,7 @@ do_init(#{client_id := ClientId,
inflight_calls => 0,
conn := undefined,
client_id => ClientId,
calls => empty
calls => ?EMPTY
}.

handle_call(stop, From, St) ->
Expand Down Expand Up @@ -831,12 +834,16 @@ request_async(Conn, Req) when is_pid(Conn) ->

%% collect send calls which are already sent to mailbox,
%% the collection is size-limited by the max_linger_bytes config.
collect_send_calls(Call, Bytes, empty, Max) ->
collect_send_calls(Call, Bytes, ?EMPTY, Max) ->
Init = #{ts => now_ts(), bytes => 0, batch_r => []},
collect_send_calls(Call, Bytes, Init, Max);
collect_send_calls(Call, Bytes, #{ts := Ts, bytes := Bytes0, batch_r := BatchR} = Calls, Max) ->
collect_send_calls(Call, Bytes, Calls, Max) ->
#{ts := Ts, bytes := Bytes0, batch_r := BatchR} = Calls,
Sum = Bytes0 + Bytes,
R = Calls#{ts => Ts, bytes => Sum, batch_r => [Call | BatchR]},
R = Calls#{ts => Ts,
bytes => Sum,
batch_r => [Call | BatchR]
},
case Sum < Max of
true ->
collect_send_calls2(R, Max);
Expand Down Expand Up @@ -879,7 +886,7 @@ is_linger_continue(#{calls := Calls, config := Config}) ->
false
end.

enqueue_calls(#{calls := empty} = St, _) ->
enqueue_calls(#{calls := ?EMPTY} = St, _) ->
%% no call to enqueue
St;
enqueue_calls(St, maybe_linger) ->
Expand All @@ -892,7 +899,7 @@ enqueue_calls(St, maybe_linger) ->
enqueue_calls(#{calls := #{batch_r := CallsR}} = St0, no_linger) ->
Calls = lists:reverse(CallsR),
St = ensure_linger_expire_timer_cancel(St0),
enqueue_calls2(Calls, St#{calls => empty}).
enqueue_calls2(Calls, St#{calls => ?EMPTY}).

enqueue_calls2(Calls,
#{replayq := Q,
Expand Down Expand Up @@ -933,9 +940,12 @@ maybe_reply_queued(?SEND_REQ({Caller, Ref}, _, _)) ->

eval_ack_cb(?ACK_CB(AckFun, Partition), BaseOffset) when is_function(AckFun, 2) ->
ok = AckFun(Partition, BaseOffset); %% backward compatible
eval_ack_cb(?ACK_CB({F, A}, Partition), BaseOffset) ->
eval_ack_cb(?ACK_CB({F, A}, Partition), BaseOffset) when is_function(F) ->
true = is_function(F, length(A) + 2),
ok = erlang:apply(F, [Partition, BaseOffset | A]).
ok = erlang:apply(F, [Partition, BaseOffset | A]);
eval_ack_cb(?ACK_CB({Caller, Ref}, Partition), BaseOffset) when ?IS_SYNC_REF(Caller, Ref) ->
_ = erlang:send(Caller, {Ref, Partition, BaseOffset}),
ok.

handle_overflow(St, Overflow) when Overflow =< 0 ->
ok = maybe_log_discard(St, 0),
Expand Down

0 comments on commit 3c4f867

Please sign in to comment.