Skip to content

Commit

Permalink
Merge pull request #69 from zmstone/0719-refactor
Browse files Browse the repository at this point in the history
refactor: remove roundrobin partitioner
  • Loading branch information
zmstone authored Jul 19, 2024
2 parents a13b2eb + 0408116 commit 2bb47aa
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 24 deletions.
2 changes: 0 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,6 @@ wolff:send(Producers, [Msg], AckFun).
* `compression`: `no_compression` (default) `snappy` or `gzip`.

* `partitioner`: default `random`. other possible values are:
`roundrobin`: Starts from partition `0`, next partition to use is saved in caller's
process dictionary.
`first_key_dispatch`: `erlang:phash2(Key) rem PartitionCount` where Key is the `key`
field of the first message in the batch to produce.
`fun((PartitionCount, [msg()]) -> partition())`: Caller defined callback.
Expand Down
32 changes: 11 additions & 21 deletions src/wolff_producers.erl
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
wolff_producer:config_key().
-type config() :: #{config_key() => term()}.
-type partitioner() :: random %% default
| roundrobin
| first_key_dispatch
| fun((PartitionCount :: pos_integer(), [wolff:msg()]) -> partition())
| partition().
Expand Down Expand Up @@ -183,21 +182,20 @@ pick_producer(#{partitioner := Partitioner,
topic := TopicOrAlias
}, Batch) ->
Count = get_partition_cnt(ClientId, TopicOrAlias),
Partition = pick_partition(Count, Partitioner, Batch),
LookupFn = fun(P) ->
{ok, Pid} = find_producer_by_partition(ClientId, TopicOrAlias, P),
Pid
end,
try
Partition = pick_partition(Count, Partitioner, Batch),
do_pick_producer(Partitioner, Partition, Count, LookupFn)
catch
throw:Reason ->
erlang:throw(#{reason => Reason,
erlang:throw(Reason#{
topic => get_topic(TopicOrAlias),
alias => get_alias(TopicOrAlias),
partition => Partition,
client => ClientId
})
})
end.

do_pick_producer(Partitioner, Partition0, Count, LookupFn) ->
Expand All @@ -206,19 +204,15 @@ do_pick_producer(Partitioner, Partition0, Count, LookupFn) ->
true -> {Partition0, Pid0};
false when Partitioner =:= random ->
pick_next_alive(LookupFn, Partition0, Count);
false when Partitioner =:= roundrobin ->
R = {Partition1, _Pid1} = pick_next_alive(LookupFn, Partition0, Count),
_ = put(wolff_roundrobin, (Partition1 + 1) rem Count),
R;
false ->
throw(producer_down)
throw(#{cause => producer_down, partition => Partition0})
end.

pick_next_alive(LookupFn, Partition, Count) ->
pick_next_alive(LookupFn, (Partition + 1) rem Count, Count, _Tried = 1).

pick_next_alive(_LookupFn, _Partition, Count, Count) ->
throw(all_producers_down);
throw(#{cause => all_producers_down});
pick_next_alive(LookupFn, Partition, Count, Tried) ->
Pid = LookupFn(Partition),
case is_alive(Pid) of
Expand All @@ -228,22 +222,18 @@ pick_next_alive(LookupFn, Partition, Count, Tried) ->

is_alive(Pid) -> is_pid(Pid) andalso is_process_alive(Pid).

pick_partition(Count, Partitioner, _) when not is_integer(Count);
Count =< 0 ->
throw(#{cause => invalid_partition_count,
count => Count,
partitioner => Partitioner
});
pick_partition(_Count, Partition, _) when is_integer(Partition) ->
Partition;
pick_partition(Count, F, Batch) when is_function(F) ->
F(Count, Batch);
pick_partition(Count, Partitioner, _) when not is_integer(Count);
Count =< 0 ->
error({invalid_partition_count, Count, Partitioner});
pick_partition(Count, random, _) ->
rand:uniform(Count) - 1;
pick_partition(Count, roundrobin, _) ->
Partition = case get(wolff_roundrobin) of
undefined -> 0;
Number -> Number
end,
_ = put(wolff_roundrobin, (Partition + 1) rem Count),
Partition;
pick_partition(Count, first_key_dispatch, [#{key := Key} | _]) ->
erlang:phash2(Key) rem Count.

Expand Down
1 change: 0 additions & 1 deletion test/wolff_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,6 @@ test_connection_restart() ->
ProducerCfg0 = producer_config(),
%% allow message linger, so we have time to kill the connection
ProducerCfg = ProducerCfg0#{max_linger_ms => 200,
partitioner => roundrobin,
reconnect_delay_ms => 0
},
{ok, Producers} = wolff:start_producers(Client, <<"test-topic">>, ProducerCfg),
Expand Down

0 comments on commit 2bb47aa

Please sign in to comment.