From b7c6cf187cd8593bf686c4507eac5a1151186e2a Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 25 Jun 2024 10:41:30 -0300 Subject: [PATCH] feat: make client share connections to the same topic between different aliases --- src/wolff_client.erl | 89 ++++++++++++++++++++++++--------- src/wolff_producers.erl | 8 +-- test/wolff_supervised_tests.erl | 46 +++++++++++++++++ 3 files changed, 116 insertions(+), 27 deletions(-) diff --git a/src/wolff_client.erl b/src/wolff_client.erl index aa5243d..4d0e027 100644 --- a/src/wolff_client.erl +++ b/src/wolff_client.erl @@ -51,11 +51,13 @@ conn_config := kpro:conn_config(), conns := #{conn_id() => connection()}, metadata_conn := pid() | not_initialized | down, - metadata_ts := #{topic_or_alias() => erlang:timestamp()}, + metadata_ts := #{topic() => erlang:timestamp()}, %% only applicable when connection strategy is per_broker %% because in this case the connections are keyed by host() %% but we need to find connection by {topic(), partition()} - leaders => #{{topic_or_alias(), partition()} => connection()} + leaders => #{{topic(), partition()} => connection()}, + %% Reference counting so we may drop connection metadata when no longer required. + known_topics := #{topic() => #{topic_or_alias() => true}} }. -define(DEFAULT_METADATA_TIMEOUT, 10000). @@ -83,7 +85,8 @@ start_link(ClientId, Hosts, Config) -> conns => #{}, metadata_conn => not_initialized, metadata_ts => #{}, - leaders => #{} + leaders => #{}, + known_topics => #{} }, case maps:get(reg_name, Config, false) of false -> gen_server:start_link(?MODULE, St, []); @@ -211,11 +214,29 @@ handle_cast({recv_leader_connection, Topic, Partition, Caller, MaxConnections}, _ = erlang:send(Caller, ?leader_connection({error, Reason})), {noreply, St0} end; - -handle_cast({delete_producers_metadata, TopicOrAlias}, #{metadata_ts := Topics, conns := Conns} = St) -> - Conns1 = maps:without( [K || K = {K1, _} <- maps:keys(Conns), K1 =:= TopicOrAlias ], Conns), - {noreply, St#{metadata_ts => maps:remove(TopicOrAlias, Topics), conns => Conns1}}; - +handle_cast({delete_producers_metadata, TopicOrAlias}, St0) -> + #{metadata_ts := Topics0, + conns := Conns0, + known_topics := KnownTopics0} = St0, + Topic = get_topic(TopicOrAlias), + case KnownTopics0 of + #{Topic := #{TopicOrAlias := true} = KnownProducers} when map_size(KnownProducers) =:= 1 -> + %% Last entry: we may drop the connection metadata + KnownTopics = maps:remove(Topic, KnownTopics0), + Conns = maps:without( [K || K = {K1, _} <- maps:keys(Conns0), K1 =:= Topic], Conns0), + Topics = maps:remove(TopicOrAlias, Topics0), + St = St0#{metadata_ts := Topics, conns := Conns, known_topics := KnownTopics}, + {noreply, St}; + #{Topic := #{TopicOrAlias := true} = KnownProducers0} -> + %% Connection is still being used by other producers. + KnownProducers = maps:remove(TopicOrAlias, KnownProducers0), + KnownTopics = KnownTopics0#{Topic := KnownProducers}, + St = St0#{known_topics := KnownTopics}, + {noreply, St}; + _ -> + %% Already gone; nothing to do. + {noreply, St0} + end; handle_cast(_Cast, St) -> {noreply, St}. @@ -274,11 +295,12 @@ do_close_connection(Pid) -> end. do_get_leader_connections(#{conns := Conns} = St, TopicOrAlias) -> + Topic = get_topic(TopicOrAlias), FindInMap = case get_connection_strategy(St) of per_partition -> Conns; per_broker -> maps:get(leaders, St) end, - F = fun({T, P}, MaybePid, Acc) when T =:= TopicOrAlias -> + F = fun({T, P}, MaybePid, Acc) when T =:= Topic -> case is_alive(MaybePid) of true -> [{P, MaybePid} | Acc]; @@ -302,14 +324,18 @@ is_metadata_fresh(#{metadata_ts := Topics, config := Config}, TopicOrAlias) -> -spec ensure_leader_connections(state(), topic_or_alias(), all_partitions | pos_integer()) -> {ok, state()} | {error, any()}. ensure_leader_connections(St, TopicOrAlias, MaxPartitions) -> - case is_metadata_fresh(St, TopicOrAlias) of + Topic = get_topic(TopicOrAlias), + case is_metadata_fresh(St, Topic) of true -> {ok, St}; false -> ensure_leader_connections2(St, TopicOrAlias, MaxPartitions) end. +-spec ensure_leader_connections2(state(), topic_or_alias(), wolff_producers:max_partitions()) -> + {ok, state()} | {error, term()}. ensure_leader_connections2(#{metadata_conn := Pid, conn_config := ConnConfig} = St, TopicOrAlias, MaxPartitions) when is_pid(Pid) -> + Topic = get_topic(TopicOrAlias), Timeout = maps:get(request_timeout, ConnConfig, ?DEFAULT_METADATA_TIMEOUT), - case do_get_metadata(Pid, TopicOrAlias, Timeout) of + case do_get_metadata(Pid, Topic, Timeout) of {ok, {Brokers, PartitionMetaList}} -> ensure_leader_connections3(St, TopicOrAlias, Pid, Brokers, PartitionMetaList, MaxPartitions); {error, _Reason} -> @@ -320,7 +346,8 @@ ensure_leader_connections2(#{metadata_conn := Pid, conn_config := ConnConfig} = end; ensure_leader_connections2(#{conn_config := ConnConfig, seed_hosts := SeedHosts} = St, TopicOrAlias, MaxPartitions) -> - case get_metadata(SeedHosts, ConnConfig, TopicOrAlias, []) of + Topic = get_topic(TopicOrAlias), + case get_metadata(SeedHosts, ConnConfig, Topic, []) of {ok, {ConnPid, {Brokers, PartitionMetaList}}} -> ensure_leader_connections3(St, TopicOrAlias, ConnPid, Brokers, PartitionMetaList, MaxPartitions); {error, Errors} -> @@ -328,6 +355,9 @@ ensure_leader_connections2(#{conn_config := ConnConfig, {error, failed_to_fetch_metadata} end. +-spec ensure_leader_connections3(state(), topic_or_alias(), pid(), _Brokers, + _PartitionMetaList, wolff_producers:max_partitions()) -> + {ok, state()}. ensure_leader_connections3(#{metadata_ts := MetadataTs} = St0, TopicOrAlias, ConnPid, Brokers, PartitionMetaList0, MaxPartitions) -> PartitionMetaList = limit_partitions_count(PartitionMetaList0, MaxPartitions), @@ -346,6 +376,7 @@ limit_partitions_count(PartitionMetaList, _) -> %% This function ensures each Topic-Partition pair has a connection record %% either a pid when the leader is healthy, or the error reason %% if failed to discover the leader or failed to connect to the leader +-spec ensure_leader_connection(state(), _Brokers, topic_or_alias(), _PartitionMetaList) -> state(). ensure_leader_connection(St, Brokers, TopicOrAlias, P_Meta) -> PartitionNum = kpro:find(partition_index, P_Meta), ErrorCode = kpro:find(error_code, P_Meta), @@ -356,14 +387,17 @@ ensure_leader_connection(St, Brokers, TopicOrAlias, P_Meta) -> maybe_disconnect_old_leader(St, TopicOrAlias, PartitionNum, ErrorCode) end. +-spec do_ensure_leader_connection(state(), _Brokers, topic_or_alias(), _Partition, _PartitionMetaList) -> + state(). do_ensure_leader_connection(#{conn_config := ConnConfig, conns := Connections0 } = St0, Brokers, TopicOrAlias, PartitionNum, P_Meta) -> + Topic = get_topic(TopicOrAlias), LeaderBrokerId = kpro:find(leader_id, P_Meta), {_, Host} = lists:keyfind(LeaderBrokerId, 1, Brokers), Strategy = get_connection_strategy(St0), ConnId = case Strategy of - per_partition -> {TopicOrAlias, PartitionNum}; + per_partition -> {Topic, PartitionNum}; per_broker -> Host end, Connections = @@ -381,7 +415,7 @@ do_ensure_leader_connection(#{conn_config := ConnConfig, Leaders0 = maps:get(leaders, St0, #{}), case Strategy of per_broker -> - Leaders = Leaders0#{{TopicOrAlias, PartitionNum} => maps:get(ConnId, Connections)}, + Leaders = Leaders0#{{Topic, PartitionNum} => maps:get(ConnId, Connections)}, St#{leaders => Leaders}; _ -> St @@ -389,11 +423,12 @@ do_ensure_leader_connection(#{conn_config := ConnConfig, %% Handle error code in partition metadata. maybe_disconnect_old_leader(#{conns := Connections} = St, TopicOrAlias, PartitionNum, ErrorCode) -> + Topic = get_topic(TopicOrAlias), Strategy = get_connection_strategy(St), case Strategy of per_partition -> %% partition metadata has error code, there is no need to keep the old connection alive - ConnId = {TopicOrAlias, PartitionNum}, + ConnId = {Topic, PartitionNum}, MaybePid = maps:get(ConnId, Connections, false), is_pid(MaybePid) andalso close_connection(MaybePid), St#{conns := add_conn({error, ErrorCode}, ConnId, Connections)}; @@ -404,7 +439,7 @@ maybe_disconnect_old_leader(#{conns := Connections} = St, TopicOrAlias, Partitio %% (due to the error code returned) there is no way to know which %% connection to close anyway. Leaders0 = maps:get(leaders, St, #{}), - Leaders = Leaders0#{{TopicOrAlias, PartitionNum} => ErrorCode}, + Leaders = Leaders0#{{Topic, PartitionNum} => ErrorCode}, St#{leaders => Leaders} end. @@ -450,18 +485,22 @@ split_config(Config) -> {ConnCfg, MyCfg} = lists:partition(Pred, maps:to_list(Config)), {maps:from_list(ConnCfg), maps:from_list(MyCfg)}. +-spec get_metadata([_Host], _ConnConfig, topic()) -> + {ok, {pid(), term()}} | {error, term()}. get_metadata(Hosts, _ConnectFun, _Topic) when Hosts =:= [] -> {error, no_hosts}; get_metadata(Hosts, ConnectFun, Topic) -> get_metadata(Hosts, ConnectFun, Topic, []). -get_metadata([], _ConnectFun, _TopicOrAlias, Errors) -> +-spec get_metadata([_Host], _ConnConfig, topic(), [Error]) -> + {ok, {pid(), term()}} | {error, [Error] | term()}. +get_metadata([], _ConnConfig, _Topic, Errors) -> {error, Errors}; -get_metadata([Host | Rest], ConnConfig, TopicOrAlias, Errors) -> +get_metadata([Host | Rest], ConnConfig, Topic, Errors) -> case do_connect(Host, ConnConfig) of {ok, Pid} -> Timeout = maps:get(request_timeout, ConnConfig, ?DEFAULT_METADATA_TIMEOUT), - case do_get_metadata(Pid, TopicOrAlias, Timeout) of + case do_get_metadata(Pid, Topic, Timeout) of {ok, Result} -> {ok, {Pid, Result}}; {error, Reason} -> @@ -470,20 +509,22 @@ get_metadata([Host | Rest], ConnConfig, TopicOrAlias, Errors) -> {error, Reason} end; {error, Reason} -> - get_metadata(Rest, ConnConfig, TopicOrAlias, [Reason | Errors]) + get_metadata(Rest, ConnConfig, Topic, [Reason | Errors]) end. -do_get_metadata(Connection, TopicOrAlias, Timeout) -> +-spec do_get_metadata(connection(), topic(), timeout()) -> + {ok, {_Brokers, _Partitions}} | {error, term()}. +do_get_metadata(Connection, Topic, Timeout) -> case kpro:get_api_versions(Connection) of {ok, Vsns} -> {_, Vsn} = maps:get(metadata, Vsns), - do_get_metadata2(Vsn, Connection, TopicOrAlias, Timeout); + do_get_metadata2(Vsn, Connection, Topic, Timeout); {error, Reason} -> {error, Reason} end. -do_get_metadata2(Vsn, Connection, TopicOrAlias, Timeout) -> - Topic = get_topic(TopicOrAlias), +-spec do_get_metadata2(_Vsn, connection(), topic(), timeout()) -> {ok, {_, _}} | {error, term()}. +do_get_metadata2(Vsn, Connection, Topic, Timeout) -> Req = kpro_req_lib:metadata(Vsn, [Topic], _IsAutoCreateAllowed = false), case kpro:request_sync(Connection, Req, Timeout) of {ok, #kpro_rsp{msg = Meta}} -> diff --git a/src/wolff_producers.erl b/src/wolff_producers.erl index 2c584c3..0a53d69 100644 --- a/src/wolff_producers.erl +++ b/src/wolff_producers.erl @@ -28,7 +28,7 @@ %% tests -export([find_producer_by_partition/3]). --export_type([producers/0, config/0, partitioner/0, producer_alias/0, topic_or_alias/0, alias_and_topic/0]). +-export_type([producers/0, config/0, partitioner/0, producer_alias/0, topic_or_alias/0, alias_and_topic/0, max_partitions/0]). -include("wolff.hrl"). -include_lib("stdlib/include/ms_transform.hrl"). @@ -41,7 +41,7 @@ }. -type producer_alias() :: binary(). --type alias_and_topic() :: {producer_alias(), topic()}. +-type alias_and_topic() :: {producer_alias() | undefined, topic()}. -type topic() :: kpro:topic(). -type topic_or_alias() :: topic() | alias_and_topic(). -type partition() :: kpro:partition(). @@ -67,6 +67,8 @@ -define(partition_count_unavailable, -1). -define(all_partitions, all_partitions). +-type max_partitions() :: pos_integer() | ?all_partitions. + %% @doc Called by wolff_producers_sup to start wolff_producers process. start_link(ClientId, TopicOrAlias, Config) -> Name = maps:get(name, Config, <<>>), @@ -100,7 +102,7 @@ start_linked_producers(ClientId, ClientPid, TopicOrAlias, ProducerCfg) -> end, case wolff_client:get_leader_connections(ClientPid, AliasTopic, MaxPartitions) of {ok, Connections} -> - Workers = start_link_producers(ClientId, TopicOrAlias, Connections, ProducerCfg), + Workers = start_link_producers(ClientId, AliasTopic, Connections, ProducerCfg), ok = put_partition_cnt(ClientId, AliasTopic, maps:size(Workers)), Partitioner = maps:get(partitioner, ProducerCfg, random), {ok, #{client_id => ClientId, diff --git a/test/wolff_supervised_tests.erl b/test/wolff_supervised_tests.erl index ae170ef..b3c1bd5 100644 --- a/test/wolff_supervised_tests.erl +++ b/test/wolff_supervised_tests.erl @@ -75,6 +75,52 @@ test_supervised_producers(Name) -> wolff_tests:deinstall_event_logging(?FUNCTION_NAME), ok. +%% Checks that having different producers to the same kafka topic works. +different_producers_same_topic_test() -> + _ = application:stop(wolff), %% ensure stopped + {ok, _} = application:ensure_all_started(wolff), + ClientId = <<"same-topic">>, + ClientCfg = client_config(), + {ok, _ClientPid} = wolff:ensure_supervised_client(ClientId, ?HOSTS, ClientCfg), + Topic = <<"test-topic">>, + ProducerName0 = <<"p0">>, + Alias0 = <<"a0">>, + ProducerCfg0 = (producer_config(ProducerName0))#{required_acks => all_isr, alias => Alias0}, + {ok, Producers0} = wolff:ensure_supervised_producers(ClientId, Topic, ProducerCfg0), + ?assertEqual({ok, Producers0}, + wolff:ensure_supervised_producers(ClientId, <<"test-topic">>, ProducerCfg0)), + ProducerName1 = <<"p1">>, + Alias1 = <<"a1">>, + ProducerCfg1 = (producer_config(ProducerName1))#{required_acks => all_isr, alias => Alias1}, + {ok, Producers1} = wolff:ensure_supervised_producers(ClientId, Topic, ProducerCfg1), + ?assertEqual({ok, Producers0}, + wolff:ensure_supervised_producers(ClientId, <<"test-topic">>, ProducerCfg0)), + %% Each replayq dir should be namespaced by the alias. + #{replayq_dir := BaseDir} = ProducerCfg0, + {ok, Files} = file:list_dir(BaseDir), + ReplayQDirs = [File || File <- Files, + filelib:is_dir(filename:join([BaseDir, File])), + lists:member(list_to_binary(File), [Alias0, Alias1])], + ?assertMatch([_, _], ReplayQDirs, #{base_dir => Files}), + %% We can send from each producer. + Msg = #{key => ?KEY, value => <<"value">>}, + Self = self(), + AckFun = fun(_Partition, _BaseOffset) -> Self ! acked, ok end, + {_Partition0, _ProducerPid0} = wolff:send(Producers0, [Msg], AckFun), + receive acked -> ok end, + {_Partition1A, _ProducerPid1A} = wolff:send(Producers1, [Msg], AckFun), + receive acked -> ok end, + %% We now stop one of the producers. The other should keep working. + ok = wolff:stop_and_delete_supervised_producers(Producers0), + %% Some time for `wolff_client:delete_producers_metadata' to be processed. + timer:sleep(100), + {_Partition1B, _ProducerPid1B} = wolff:send(Producers1, [Msg], AckFun), + receive acked -> ok end, + ok = wolff:stop_and_delete_supervised_producers(Producers1), + ok = wolff:stop_and_delete_supervised_client(ClientId), + ok = application:stop(wolff), + ok. + client_restart_test() -> ClientId = <<"client-restart-test">>, Topic = <<"test-topic">>,