From 0408116fa27b5891513d5e8b90e462ecbeff200b Mon Sep 17 00:00:00 2001 From: zmstone Date: Fri, 19 Jul 2024 09:14:43 +0200 Subject: [PATCH] refactor: remove roundrobin partitioner roudrobin was never used, and it was implemented wrong (process dict) if needed, it should be re-implemented using an atomic --- README.md | 2 -- src/wolff_producers.erl | 32 +++++++++++--------------------- test/wolff_tests.erl | 1 - 3 files changed, 11 insertions(+), 24 deletions(-) diff --git a/README.md b/README.md index 2aea154..54698b9 100644 --- a/README.md +++ b/README.md @@ -167,8 +167,6 @@ wolff:send(Producers, [Msg], AckFun). * `compression`: `no_compression` (default) `snappy` or `gzip`. * `partitioner`: default `random`. other possible values are: - `roundrobin`: Starts from partition `0`, next partition to use is saved in caller's - process dictionary. `first_key_dispatch`: `erlang:phash2(Key) rem PartitionCount` where Key is the `key` field of the first message in the batch to produce. `fun((PartitionCount, [msg()]) -> partition())`: Caller defined callback. diff --git a/src/wolff_producers.erl b/src/wolff_producers.erl index 4fb6ec7..feb4613 100644 --- a/src/wolff_producers.erl +++ b/src/wolff_producers.erl @@ -49,7 +49,6 @@ wolff_producer:config_key(). -type config() :: #{config_key() => term()}. -type partitioner() :: random %% default - | roundrobin | first_key_dispatch | fun((PartitionCount :: pos_integer(), [wolff:msg()]) -> partition()) | partition(). @@ -183,21 +182,20 @@ pick_producer(#{partitioner := Partitioner, topic := TopicOrAlias }, Batch) -> Count = get_partition_cnt(ClientId, TopicOrAlias), - Partition = pick_partition(Count, Partitioner, Batch), LookupFn = fun(P) -> {ok, Pid} = find_producer_by_partition(ClientId, TopicOrAlias, P), Pid end, try + Partition = pick_partition(Count, Partitioner, Batch), do_pick_producer(Partitioner, Partition, Count, LookupFn) catch throw:Reason -> - erlang:throw(#{reason => Reason, + erlang:throw(Reason#{ topic => get_topic(TopicOrAlias), alias => get_alias(TopicOrAlias), - partition => Partition, client => ClientId - }) + }) end. do_pick_producer(Partitioner, Partition0, Count, LookupFn) -> @@ -206,19 +204,15 @@ do_pick_producer(Partitioner, Partition0, Count, LookupFn) -> true -> {Partition0, Pid0}; false when Partitioner =:= random -> pick_next_alive(LookupFn, Partition0, Count); - false when Partitioner =:= roundrobin -> - R = {Partition1, _Pid1} = pick_next_alive(LookupFn, Partition0, Count), - _ = put(wolff_roundrobin, (Partition1 + 1) rem Count), - R; false -> - throw(producer_down) + throw(#{cause => producer_down, partition => Partition0}) end. pick_next_alive(LookupFn, Partition, Count) -> pick_next_alive(LookupFn, (Partition + 1) rem Count, Count, _Tried = 1). pick_next_alive(_LookupFn, _Partition, Count, Count) -> - throw(all_producers_down); + throw(#{cause => all_producers_down}); pick_next_alive(LookupFn, Partition, Count, Tried) -> Pid = LookupFn(Partition), case is_alive(Pid) of @@ -228,22 +222,18 @@ pick_next_alive(LookupFn, Partition, Count, Tried) -> is_alive(Pid) -> is_pid(Pid) andalso is_process_alive(Pid). +pick_partition(Count, Partitioner, _) when not is_integer(Count); + Count =< 0 -> + throw(#{cause => invalid_partition_count, + count => Count, + partitioner => Partitioner + }); pick_partition(_Count, Partition, _) when is_integer(Partition) -> Partition; pick_partition(Count, F, Batch) when is_function(F) -> F(Count, Batch); -pick_partition(Count, Partitioner, _) when not is_integer(Count); - Count =< 0 -> - error({invalid_partition_count, Count, Partitioner}); pick_partition(Count, random, _) -> rand:uniform(Count) - 1; -pick_partition(Count, roundrobin, _) -> - Partition = case get(wolff_roundrobin) of - undefined -> 0; - Number -> Number - end, - _ = put(wolff_roundrobin, (Partition + 1) rem Count), - Partition; pick_partition(Count, first_key_dispatch, [#{key := Key} | _]) -> erlang:phash2(Key) rem Count. diff --git a/test/wolff_tests.erl b/test/wolff_tests.erl index 758c107..235d2a7 100644 --- a/test/wolff_tests.erl +++ b/test/wolff_tests.erl @@ -227,7 +227,6 @@ test_connection_restart() -> ProducerCfg0 = producer_config(), %% allow message linger, so we have time to kill the connection ProducerCfg = ProducerCfg0#{max_linger_ms => 200, - partitioner => roundrobin, reconnect_delay_ms => 0 }, {ok, Producers} = wolff:start_producers(Client, <<"test-topic">>, ProducerCfg),