From 168ae92a6aa34212c4ecaaecb15c70144d92e69b Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Mon, 29 Jan 2024 21:00:29 +0100 Subject: [PATCH] feat: add max_partitions limit --- changelog.md | 3 +++ src/wolff_client.erl | 41 ++++++++++++++++++++++----------- src/wolff_producer.erl | 11 +++++---- src/wolff_producers.erl | 10 +++++--- test/wolff_supervised_tests.erl | 24 +++++++++++++++++++ 5 files changed, 69 insertions(+), 20 deletions(-) diff --git a/changelog.md b/changelog.md index d1971be..73cf51d 100644 --- a/changelog.md +++ b/changelog.md @@ -1,3 +1,6 @@ +* 1.10.0 + - Add `max_partitions` producer config to limit the number of partition producers so the client side is also possible to have control over resource utilization. + * 1.9.1 - Use ETS (named `wolff_clients_global`) for client ID registration. When there are thousands of clients, `supervisor:which_children` becomes quite expensive. diff --git a/src/wolff_client.erl b/src/wolff_client.erl index 768b8f6..dc9b278 100644 --- a/src/wolff_client.erl +++ b/src/wolff_client.erl @@ -19,7 +19,11 @@ %% APIs -export([start_link/3, stop/1]). --export([get_leader_connections/2, recv_leader_connection/4, get_id/1, delete_producers_metadata/2]). +-export([get_leader_connections/2, + get_leader_connections/3, + recv_leader_connection/5, + get_id/1, + delete_producers_metadata/2]). -export([check_connectivity/1, check_connectivity/2]). -export([check_if_topic_exists/2, check_if_topic_exists/3, check_topic_exists_with_client_pid/2]). @@ -88,7 +92,12 @@ get_id(Pid) -> -spec get_leader_connections(pid(), topic()) -> {ok, [{partition(), pid() | ?conn_down(_)}]} | {error, any()}. get_leader_connections(Client, Topic) -> - safe_call(Client, {get_leader_connections, Topic}). + safe_call(Client, {get_leader_connections, Topic, all_partitions}). + +-spec get_leader_connections(pid(), topic(), pos_integer()) -> + {ok, [{partition(), pid() | ?conn_down(_)}]} | {error, any()}. +get_leader_connections(Client, Topic, MaxPartitions) -> + safe_call(Client, {get_leader_connections, Topic, MaxPartitions}). -spec check_connectivity(pid()) -> ok | {error, any()}. check_connectivity(Pid) -> @@ -140,8 +149,8 @@ safe_call(Pid, Call) -> end. %% request client to send Pid the leader connection. -recv_leader_connection(Client, Topic, Partition, Pid) -> - gen_server:cast(Client, {recv_leader_connection, Topic, Partition, Pid}). +recv_leader_connection(Client, Topic, Partition, Pid, MaxPartitions) -> + gen_server:cast(Client, {recv_leader_connection, Topic, Partition, Pid, MaxPartitions}). delete_producers_metadata(Client, Topic) -> gen_server:cast(Client, {delete_producers_metadata, Topic}). @@ -157,8 +166,8 @@ handle_call(get_id, _From, #{client_id := Id} = St) -> {reply, Id, St}; handle_call({check_if_topic_exists, Topic}, _From, #{seed_hosts := Hosts, conn_config := ConnConfig} = St) -> {reply, check_if_topic_exists(Hosts, ConnConfig, Topic), St}; -handle_call({get_leader_connections, Topic}, _From, St0) -> - case ensure_leader_connections(St0, Topic) of +handle_call({get_leader_connections, Topic, MaxPartitions}, _From, St0) -> + case ensure_leader_connections(St0, Topic, MaxPartitions) of {ok, St} -> Result = do_get_leader_connections(St, Topic), {reply, {ok, Result}, St}; @@ -180,8 +189,8 @@ handle_info(_Info, St) -> handle_cast(Cast, #{connect := _Fun} = St) -> handle_cast(Cast, upgrade(St)); -handle_cast({recv_leader_connection, Topic, Partition, Caller}, St0) -> - case ensure_leader_connections(St0, Topic) of +handle_cast({recv_leader_connection, Topic, Partition, Caller, MaxConnections}, St0) -> + case ensure_leader_connections(St0, Topic, MaxConnections) of {ok, St} -> Partitions = do_get_leader_connections(St, Topic), %% the Partition in argument is a result of ensure_leader_connections @@ -258,20 +267,21 @@ is_metadata_fresh(#{metadata_ts := Topics, config := Config}, Topic) -> Ts -> timer:now_diff(erlang:timestamp(), Ts) < MinInterval * 1000 end. --spec ensure_leader_connections(state(), topic()) -> +-spec ensure_leader_connections(state(), topic(), all_partitions | pos_integer()) -> {ok, state()} | {error, any()}. -ensure_leader_connections(St, Topic) -> +ensure_leader_connections(St, Topic, MaxPartitions) -> case is_metadata_fresh(St, Topic) of true -> {ok, St}; - false -> do_ensure_leader_connections(St, Topic) + false -> do_ensure_leader_connections(St, Topic, MaxPartitions) end. do_ensure_leader_connections(#{conn_config := ConnConfig, seed_hosts := SeedHosts, metadata_ts := MetadataTs - } = St0, Topic) -> + } = St0, Topic, MaxPartitions) -> case get_metadata(SeedHosts, ConnConfig, Topic) of - {ok, {Brokers, PartitionMetaList}} -> + {ok, {Brokers, PartitionMetaList0}} -> + PartitionMetaList = limit_partitions_count(PartitionMetaList0, MaxPartitions), St = lists:foldl(fun(PartitionMeta, StIn) -> ensure_leader_connection(StIn, Brokers, Topic, PartitionMeta) end, St0, PartitionMetaList), @@ -281,6 +291,11 @@ do_ensure_leader_connections(#{conn_config := ConnConfig, {error, failed_to_fetch_metadata} end. +limit_partitions_count(PartitionMetaList, Max) when is_integer(Max) andalso Max < length(PartitionMetaList) -> + lists:sublist(PartitionMetaList, Max); +limit_partitions_count(PartitionMetaList, all_partitions) -> + PartitionMetaList. + %% This function ensures each Topic-Partition pair has a connection record %% either a pid when the leader is healthy, or the error reason %% if failed to discover the leader or failed to connect to the leader diff --git a/src/wolff_producer.erl b/src/wolff_producer.erl index 372ba77..dc507ec 100644 --- a/src/wolff_producer.erl +++ b/src/wolff_producer.erl @@ -47,7 +47,8 @@ max_send_ahead | compression | drop_if_highmem | - telemetry_meta_data. + telemetry_meta_data | + max_partitions. -type config() :: #{replayq_dir := string(), replayq_max_total_bytes => pos_integer(), @@ -61,7 +62,8 @@ compression => kpro:compress_option(), drop_if_highmem => boolean(), telemetry_meta_data => map(), - enable_global_stats => boolean() + enable_global_stats => boolean(), + max_partitions => pos_integer() }. -define(no_timer, no_timer). @@ -590,13 +592,14 @@ log_connection_down(Topic, Partition, Conn, Reason) -> "connection_to_partition_leader_error", #{conn => Conn, reason => Reason}). -ensure_delayed_reconnect(#{config := #{reconnect_delay_ms := Delay0}, +ensure_delayed_reconnect(#{config := #{reconnect_delay_ms := Delay0} = Config, client_id := ClientId, topic := Topic, partition := Partition, reconnect_timer := ?no_timer } = St, DelayStrategy) -> Attempts = maps:get(reconnect_attempts, St, 0), + MaxPartitions = maps:get(max_partitions, Config, all_partitions), Attempts > 0 andalso Attempts rem 10 =:= 0 andalso log_error(Topic, Partition, "producer_is_still_disconnected_after_retry", @@ -613,7 +616,7 @@ ensure_delayed_reconnect(#{config := #{reconnect_delay_ms := Delay0}, end, case wolff_client_sup:find_client(ClientId) of {ok, ClientPid} -> - Args = [ClientPid, Topic, Partition, self()], + Args = [ClientPid, Topic, Partition, self(), MaxPartitions], {ok, Tref} = timer:apply_after(Delay, wolff_client, recv_leader_connection, Args), St#{reconnect_timer => Tref, reconnect_attempts => Attempts + 1}; {error, Reason} -> diff --git a/src/wolff_producers.erl b/src/wolff_producers.erl index 345c482..425e254 100644 --- a/src/wolff_producers.erl +++ b/src/wolff_producers.erl @@ -20,6 +20,7 @@ -export([start_linked_producers/3, stop_linked/1]). -export([start_supervised/3, stop_supervised/1, stop_supervised/2]). -export([pick_producer/2, lookup_producer/2, cleanup_workers_table/2]). +-export([find_producers_by_client_topic/2]). %% gen_server callbacks -export([code_change/3, handle_call/3, handle_cast/2, handle_info/2, init/1, terminate/2]). @@ -61,6 +62,7 @@ -define(partition_count_refresh_interval_seconds, 300). -define(refresh_partition_count, refresh_partition_count). -define(partition_count_unavailable, -1). +-define(all_partitions, all_partitions). %% @doc Called by wolff_producers_sup to start wolff_producers process. start_link(ClientId, Topic, Config) -> @@ -83,7 +85,8 @@ start_linked_producers(ClientPid, Topic, ProducerCfg) when is_pid(ClientPid) -> start_linked_producers(ClientId, ClientPid, Topic, ProducerCfg). start_linked_producers(ClientId, ClientPid, Topic, ProducerCfg) -> - case wolff_client:get_leader_connections(ClientPid, Topic) of + MaxPartitions = maps:get(max_partitions, ProducerCfg, ?all_partitions), + case wolff_client:get_leader_connections(ClientPid, Topic, MaxPartitions) of {ok, Connections} -> Workers = start_link_producers(ClientId, Topic, Connections, ProducerCfg), ok = put_partition_cnt(ClientId, Topic, maps:size(Workers)), @@ -416,8 +419,9 @@ refresh_partition_count(#{client_pid := Pid} = St) when not is_pid(Pid) -> refresh_partition_count(#{producers_status := ?not_initialized} = St) -> %% to be initialized St; -refresh_partition_count(#{client_pid := Pid, topic := Topic} = St) -> - case wolff_client:get_leader_connections(Pid, Topic) of +refresh_partition_count(#{client_pid := Pid, topic := Topic, config := Config} = St) -> + MaxPartitions = maps:get(max_partitions, Config, ?all_partitions), + case wolff_client:get_leader_connections(Pid, Topic, MaxPartitions) of {ok, Connections} -> start_new_producers(St, Connections); {error, Reason} -> diff --git a/test/wolff_supervised_tests.erl b/test/wolff_supervised_tests.erl index f3a339d..def9e3a 100644 --- a/test/wolff_supervised_tests.erl +++ b/test/wolff_supervised_tests.erl @@ -123,6 +123,30 @@ test_client_restart(ClientId, Topic, Partition) -> wolff_tests:deinstall_event_logging(?FUNCTION_NAME), ok. +max_partitions_test() -> + ClientId = <<"max-partitions-test">>, + Topic = <<"test-topic-2">>, + Partition = 0, + MaxPartitions = 1, + _ = application:stop(wolff), %% ensure stopped + {ok, _} = application:ensure_all_started(wolff), + ClientCfg = #{connection_strategy => per_partition}, + {ok, _ClientPid} = wolff:ensure_supervised_client(ClientId, ?HOSTS, ClientCfg), + ProducerCfg = #{required_acks => all_isr, + partitioner => Partition, + max_partitions => MaxPartitions + }, + {ok, Producers} = wolff:ensure_supervised_producers(ClientId, Topic, ProducerCfg), + %% the topic has two partitions, but limited only to started one producer + ?assertMatch([_], wolff_producers:find_producers_by_client_topic(ClientId, Topic)), + %% cleanup + ok = wolff:stop_and_delete_supervised_producers(Producers), + ?assertEqual([], supervisor:which_children(wolff_producers_sup)), + ok = wolff:stop_and_delete_supervised_client(ClientId), + ?assertEqual([], supervisor:which_children(wolff_client_sup)), + ok = application:stop(wolff), + ok. + %% Test against a bad host. %% No connection will be established at all. %% Producer workers should not crash, async APIs should work.