Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add max_partitions limit #59

Merged
merged 1 commit into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
* 1.10.0
- Add `max_partitions` producer config to limit the number of partition producers so the client side is also possible to have control over resource utilization.

* 1.9.1
- Use ETS (named `wolff_clients_global`) for client ID registration.
When there are thousands of clients, `supervisor:which_children` becomes quite expensive.
Expand Down
41 changes: 28 additions & 13 deletions src/wolff_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@

%% APIs
-export([start_link/3, stop/1]).
-export([get_leader_connections/2, recv_leader_connection/4, get_id/1, delete_producers_metadata/2]).
-export([get_leader_connections/2,
get_leader_connections/3,
recv_leader_connection/5,
get_id/1,
delete_producers_metadata/2]).
-export([check_connectivity/1, check_connectivity/2]).
-export([check_if_topic_exists/2, check_if_topic_exists/3, check_topic_exists_with_client_pid/2]).

Expand Down Expand Up @@ -88,7 +92,12 @@ get_id(Pid) ->
-spec get_leader_connections(pid(), topic()) ->
{ok, [{partition(), pid() | ?conn_down(_)}]} | {error, any()}.
get_leader_connections(Client, Topic) ->
safe_call(Client, {get_leader_connections, Topic}).
safe_call(Client, {get_leader_connections, Topic, all_partitions}).

-spec get_leader_connections(pid(), topic(), pos_integer()) ->
{ok, [{partition(), pid() | ?conn_down(_)}]} | {error, any()}.
get_leader_connections(Client, Topic, MaxPartitions) ->
safe_call(Client, {get_leader_connections, Topic, MaxPartitions}).

-spec check_connectivity(pid()) -> ok | {error, any()}.
check_connectivity(Pid) ->
Expand Down Expand Up @@ -140,8 +149,8 @@ safe_call(Pid, Call) ->
end.

%% request client to send Pid the leader connection.
recv_leader_connection(Client, Topic, Partition, Pid) ->
gen_server:cast(Client, {recv_leader_connection, Topic, Partition, Pid}).
recv_leader_connection(Client, Topic, Partition, Pid, MaxPartitions) ->
gen_server:cast(Client, {recv_leader_connection, Topic, Partition, Pid, MaxPartitions}).

delete_producers_metadata(Client, Topic) ->
gen_server:cast(Client, {delete_producers_metadata, Topic}).
Expand All @@ -157,8 +166,8 @@ handle_call(get_id, _From, #{client_id := Id} = St) ->
{reply, Id, St};
handle_call({check_if_topic_exists, Topic}, _From, #{seed_hosts := Hosts, conn_config := ConnConfig} = St) ->
{reply, check_if_topic_exists(Hosts, ConnConfig, Topic), St};
handle_call({get_leader_connections, Topic}, _From, St0) ->
case ensure_leader_connections(St0, Topic) of
handle_call({get_leader_connections, Topic, MaxPartitions}, _From, St0) ->
case ensure_leader_connections(St0, Topic, MaxPartitions) of
{ok, St} ->
Result = do_get_leader_connections(St, Topic),
{reply, {ok, Result}, St};
Expand All @@ -180,8 +189,8 @@ handle_info(_Info, St) ->

handle_cast(Cast, #{connect := _Fun} = St) ->
handle_cast(Cast, upgrade(St));
handle_cast({recv_leader_connection, Topic, Partition, Caller}, St0) ->
case ensure_leader_connections(St0, Topic) of
handle_cast({recv_leader_connection, Topic, Partition, Caller, MaxConnections}, St0) ->
case ensure_leader_connections(St0, Topic, MaxConnections) of
{ok, St} ->
Partitions = do_get_leader_connections(St, Topic),
%% the Partition in argument is a result of ensure_leader_connections
Expand Down Expand Up @@ -258,20 +267,21 @@ is_metadata_fresh(#{metadata_ts := Topics, config := Config}, Topic) ->
Ts -> timer:now_diff(erlang:timestamp(), Ts) < MinInterval * 1000
end.

-spec ensure_leader_connections(state(), topic()) ->
-spec ensure_leader_connections(state(), topic(), all_partitions | pos_integer()) ->
{ok, state()} | {error, any()}.
ensure_leader_connections(St, Topic) ->
ensure_leader_connections(St, Topic, MaxPartitions) ->
case is_metadata_fresh(St, Topic) of
true -> {ok, St};
false -> do_ensure_leader_connections(St, Topic)
false -> do_ensure_leader_connections(St, Topic, MaxPartitions)
end.

do_ensure_leader_connections(#{conn_config := ConnConfig,
seed_hosts := SeedHosts,
metadata_ts := MetadataTs
} = St0, Topic) ->
} = St0, Topic, MaxPartitions) ->
case get_metadata(SeedHosts, ConnConfig, Topic) of
{ok, {Brokers, PartitionMetaList}} ->
{ok, {Brokers, PartitionMetaList0}} ->
PartitionMetaList = limit_partitions_count(PartitionMetaList0, MaxPartitions),
St = lists:foldl(fun(PartitionMeta, StIn) ->
ensure_leader_connection(StIn, Brokers, Topic, PartitionMeta)
end, St0, PartitionMetaList),
Expand All @@ -281,6 +291,11 @@ do_ensure_leader_connections(#{conn_config := ConnConfig,
{error, failed_to_fetch_metadata}
end.

limit_partitions_count(PartitionMetaList, Max) when is_integer(Max) andalso Max < length(PartitionMetaList) ->
lists:sublist(PartitionMetaList, Max);
limit_partitions_count(PartitionMetaList, all_partitions) ->
PartitionMetaList.

%% This function ensures each Topic-Partition pair has a connection record
%% either a pid when the leader is healthy, or the error reason
%% if failed to discover the leader or failed to connect to the leader
Expand Down
11 changes: 7 additions & 4 deletions src/wolff_producer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@
max_send_ahead |
compression |
drop_if_highmem |
telemetry_meta_data.
telemetry_meta_data |
max_partitions.

-type config() :: #{replayq_dir := string(),
replayq_max_total_bytes => pos_integer(),
Expand All @@ -61,7 +62,8 @@
compression => kpro:compress_option(),
drop_if_highmem => boolean(),
telemetry_meta_data => map(),
enable_global_stats => boolean()
enable_global_stats => boolean(),
max_partitions => pos_integer()
}.

-define(no_timer, no_timer).
Expand Down Expand Up @@ -590,13 +592,14 @@ log_connection_down(Topic, Partition, Conn, Reason) ->
"connection_to_partition_leader_error",
#{conn => Conn, reason => Reason}).

ensure_delayed_reconnect(#{config := #{reconnect_delay_ms := Delay0},
ensure_delayed_reconnect(#{config := #{reconnect_delay_ms := Delay0} = Config,
client_id := ClientId,
topic := Topic,
partition := Partition,
reconnect_timer := ?no_timer
} = St, DelayStrategy) ->
Attempts = maps:get(reconnect_attempts, St, 0),
MaxPartitions = maps:get(max_partitions, Config, all_partitions),
Attempts > 0 andalso Attempts rem 10 =:= 0 andalso
log_error(Topic, Partition,
"producer_is_still_disconnected_after_retry",
Expand All @@ -613,7 +616,7 @@ ensure_delayed_reconnect(#{config := #{reconnect_delay_ms := Delay0},
end,
case wolff_client_sup:find_client(ClientId) of
{ok, ClientPid} ->
Args = [ClientPid, Topic, Partition, self()],
Args = [ClientPid, Topic, Partition, self(), MaxPartitions],
{ok, Tref} = timer:apply_after(Delay, wolff_client, recv_leader_connection, Args),
St#{reconnect_timer => Tref, reconnect_attempts => Attempts + 1};
{error, Reason} ->
Expand Down
10 changes: 7 additions & 3 deletions src/wolff_producers.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
-export([start_linked_producers/3, stop_linked/1]).
-export([start_supervised/3, stop_supervised/1, stop_supervised/2]).
-export([pick_producer/2, lookup_producer/2, cleanup_workers_table/2]).
-export([find_producers_by_client_topic/2]).

%% gen_server callbacks
-export([code_change/3, handle_call/3, handle_cast/2, handle_info/2, init/1, terminate/2]).
Expand Down Expand Up @@ -61,6 +62,7 @@
-define(partition_count_refresh_interval_seconds, 300).
-define(refresh_partition_count, refresh_partition_count).
-define(partition_count_unavailable, -1).
-define(all_partitions, all_partitions).

%% @doc Called by wolff_producers_sup to start wolff_producers process.
start_link(ClientId, Topic, Config) ->
Expand All @@ -83,7 +85,8 @@ start_linked_producers(ClientPid, Topic, ProducerCfg) when is_pid(ClientPid) ->
start_linked_producers(ClientId, ClientPid, Topic, ProducerCfg).

start_linked_producers(ClientId, ClientPid, Topic, ProducerCfg) ->
case wolff_client:get_leader_connections(ClientPid, Topic) of
MaxPartitions = maps:get(max_partitions, ProducerCfg, ?all_partitions),
case wolff_client:get_leader_connections(ClientPid, Topic, MaxPartitions) of
{ok, Connections} ->
Workers = start_link_producers(ClientId, Topic, Connections, ProducerCfg),
ok = put_partition_cnt(ClientId, Topic, maps:size(Workers)),
Expand Down Expand Up @@ -416,8 +419,9 @@ refresh_partition_count(#{client_pid := Pid} = St) when not is_pid(Pid) ->
refresh_partition_count(#{producers_status := ?not_initialized} = St) ->
%% to be initialized
St;
refresh_partition_count(#{client_pid := Pid, topic := Topic} = St) ->
case wolff_client:get_leader_connections(Pid, Topic) of
refresh_partition_count(#{client_pid := Pid, topic := Topic, config := Config} = St) ->
MaxPartitions = maps:get(max_partitions, Config, ?all_partitions),
case wolff_client:get_leader_connections(Pid, Topic, MaxPartitions) of
{ok, Connections} ->
start_new_producers(St, Connections);
{error, Reason} ->
Expand Down
24 changes: 24 additions & 0 deletions test/wolff_supervised_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,30 @@ test_client_restart(ClientId, Topic, Partition) ->
wolff_tests:deinstall_event_logging(?FUNCTION_NAME),
ok.

max_partitions_test() ->
ClientId = <<"max-partitions-test">>,
Topic = <<"test-topic-2">>,
Partition = 0,
MaxPartitions = 1,
_ = application:stop(wolff), %% ensure stopped
{ok, _} = application:ensure_all_started(wolff),
ClientCfg = #{connection_strategy => per_partition},
{ok, _ClientPid} = wolff:ensure_supervised_client(ClientId, ?HOSTS, ClientCfg),
ProducerCfg = #{required_acks => all_isr,
partitioner => Partition,
max_partitions => MaxPartitions
},
{ok, Producers} = wolff:ensure_supervised_producers(ClientId, Topic, ProducerCfg),
%% the topic has two partitions, but limited only to started one producer
?assertMatch([_], wolff_producers:find_producers_by_client_topic(ClientId, Topic)),
%% cleanup
ok = wolff:stop_and_delete_supervised_producers(Producers),
?assertEqual([], supervisor:which_children(wolff_producers_sup)),
ok = wolff:stop_and_delete_supervised_client(ClientId),
?assertEqual([], supervisor:which_children(wolff_client_sup)),
ok = application:stop(wolff),
ok.

%% Test against a bad host.
%% No connection will be established at all.
%% Producer workers should not crash, async APIs should work.
Expand Down
Loading