Skip to content

Commit

Permalink
feat: make client share connections to the same topic between differe…
Browse files Browse the repository at this point in the history
…nt aliases
  • Loading branch information
thalesmg committed Jun 25, 2024
1 parent 8b80ed4 commit b7c6cf1
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 27 deletions.
89 changes: 65 additions & 24 deletions src/wolff_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,13 @@
conn_config := kpro:conn_config(),
conns := #{conn_id() => connection()},
metadata_conn := pid() | not_initialized | down,
metadata_ts := #{topic_or_alias() => erlang:timestamp()},
metadata_ts := #{topic() => erlang:timestamp()},
%% only applicable when connection strategy is per_broker
%% because in this case the connections are keyed by host()
%% but we need to find connection by {topic(), partition()}
leaders => #{{topic_or_alias(), partition()} => connection()}
leaders => #{{topic(), partition()} => connection()},
%% Reference counting so we may drop connection metadata when no longer required.
known_topics := #{topic() => #{topic_or_alias() => true}}
}.

-define(DEFAULT_METADATA_TIMEOUT, 10000).
Expand Down Expand Up @@ -83,7 +85,8 @@ start_link(ClientId, Hosts, Config) ->
conns => #{},
metadata_conn => not_initialized,
metadata_ts => #{},
leaders => #{}
leaders => #{},
known_topics => #{}
},
case maps:get(reg_name, Config, false) of
false -> gen_server:start_link(?MODULE, St, []);
Expand Down Expand Up @@ -211,11 +214,29 @@ handle_cast({recv_leader_connection, Topic, Partition, Caller, MaxConnections},
_ = erlang:send(Caller, ?leader_connection({error, Reason})),
{noreply, St0}
end;

handle_cast({delete_producers_metadata, TopicOrAlias}, #{metadata_ts := Topics, conns := Conns} = St) ->
Conns1 = maps:without( [K || K = {K1, _} <- maps:keys(Conns), K1 =:= TopicOrAlias ], Conns),
{noreply, St#{metadata_ts => maps:remove(TopicOrAlias, Topics), conns => Conns1}};

handle_cast({delete_producers_metadata, TopicOrAlias}, St0) ->
#{metadata_ts := Topics0,
conns := Conns0,
known_topics := KnownTopics0} = St0,
Topic = get_topic(TopicOrAlias),
case KnownTopics0 of
#{Topic := #{TopicOrAlias := true} = KnownProducers} when map_size(KnownProducers) =:= 1 ->
%% Last entry: we may drop the connection metadata
KnownTopics = maps:remove(Topic, KnownTopics0),
Conns = maps:without( [K || K = {K1, _} <- maps:keys(Conns0), K1 =:= Topic], Conns0),
Topics = maps:remove(TopicOrAlias, Topics0),
St = St0#{metadata_ts := Topics, conns := Conns, known_topics := KnownTopics},
{noreply, St};
#{Topic := #{TopicOrAlias := true} = KnownProducers0} ->
%% Connection is still being used by other producers.
KnownProducers = maps:remove(TopicOrAlias, KnownProducers0),
KnownTopics = KnownTopics0#{Topic := KnownProducers},
St = St0#{known_topics := KnownTopics},
{noreply, St};
_ ->
%% Already gone; nothing to do.
{noreply, St0}
end;
handle_cast(_Cast, St) ->
{noreply, St}.

Expand Down Expand Up @@ -274,11 +295,12 @@ do_close_connection(Pid) ->
end.

do_get_leader_connections(#{conns := Conns} = St, TopicOrAlias) ->
Topic = get_topic(TopicOrAlias),
FindInMap = case get_connection_strategy(St) of
per_partition -> Conns;
per_broker -> maps:get(leaders, St)
end,
F = fun({T, P}, MaybePid, Acc) when T =:= TopicOrAlias ->
F = fun({T, P}, MaybePid, Acc) when T =:= Topic ->
case is_alive(MaybePid) of
true ->
[{P, MaybePid} | Acc];
Expand All @@ -302,14 +324,18 @@ is_metadata_fresh(#{metadata_ts := Topics, config := Config}, TopicOrAlias) ->
-spec ensure_leader_connections(state(), topic_or_alias(), all_partitions | pos_integer()) ->
{ok, state()} | {error, any()}.
ensure_leader_connections(St, TopicOrAlias, MaxPartitions) ->
case is_metadata_fresh(St, TopicOrAlias) of
Topic = get_topic(TopicOrAlias),
case is_metadata_fresh(St, Topic) of
true -> {ok, St};
false -> ensure_leader_connections2(St, TopicOrAlias, MaxPartitions)
end.

-spec ensure_leader_connections2(state(), topic_or_alias(), wolff_producers:max_partitions()) ->
{ok, state()} | {error, term()}.
ensure_leader_connections2(#{metadata_conn := Pid, conn_config := ConnConfig} = St, TopicOrAlias, MaxPartitions) when is_pid(Pid) ->
Topic = get_topic(TopicOrAlias),
Timeout = maps:get(request_timeout, ConnConfig, ?DEFAULT_METADATA_TIMEOUT),
case do_get_metadata(Pid, TopicOrAlias, Timeout) of
case do_get_metadata(Pid, Topic, Timeout) of
{ok, {Brokers, PartitionMetaList}} ->
ensure_leader_connections3(St, TopicOrAlias, Pid, Brokers, PartitionMetaList, MaxPartitions);
{error, _Reason} ->
Expand All @@ -320,14 +346,18 @@ ensure_leader_connections2(#{metadata_conn := Pid, conn_config := ConnConfig} =
end;
ensure_leader_connections2(#{conn_config := ConnConfig,
seed_hosts := SeedHosts} = St, TopicOrAlias, MaxPartitions) ->
case get_metadata(SeedHosts, ConnConfig, TopicOrAlias, []) of
Topic = get_topic(TopicOrAlias),
case get_metadata(SeedHosts, ConnConfig, Topic, []) of
{ok, {ConnPid, {Brokers, PartitionMetaList}}} ->
ensure_leader_connections3(St, TopicOrAlias, ConnPid, Brokers, PartitionMetaList, MaxPartitions);
{error, Errors} ->
log_warn(failed_to_fetch_metadata, #{topic => get_topic(TopicOrAlias), errors => Errors}),
{error, failed_to_fetch_metadata}
end.

-spec ensure_leader_connections3(state(), topic_or_alias(), pid(), _Brokers,
_PartitionMetaList, wolff_producers:max_partitions()) ->
{ok, state()}.
ensure_leader_connections3(#{metadata_ts := MetadataTs} = St0, TopicOrAlias,
ConnPid, Brokers, PartitionMetaList0, MaxPartitions) ->
PartitionMetaList = limit_partitions_count(PartitionMetaList0, MaxPartitions),
Expand All @@ -346,6 +376,7 @@ limit_partitions_count(PartitionMetaList, _) ->
%% This function ensures each Topic-Partition pair has a connection record
%% either a pid when the leader is healthy, or the error reason
%% if failed to discover the leader or failed to connect to the leader
-spec ensure_leader_connection(state(), _Brokers, topic_or_alias(), _PartitionMetaList) -> state().
ensure_leader_connection(St, Brokers, TopicOrAlias, P_Meta) ->
PartitionNum = kpro:find(partition_index, P_Meta),
ErrorCode = kpro:find(error_code, P_Meta),
Expand All @@ -356,14 +387,17 @@ ensure_leader_connection(St, Brokers, TopicOrAlias, P_Meta) ->
maybe_disconnect_old_leader(St, TopicOrAlias, PartitionNum, ErrorCode)
end.

-spec do_ensure_leader_connection(state(), _Brokers, topic_or_alias(), _Partition, _PartitionMetaList) ->
state().
do_ensure_leader_connection(#{conn_config := ConnConfig,
conns := Connections0
} = St0, Brokers, TopicOrAlias, PartitionNum, P_Meta) ->
Topic = get_topic(TopicOrAlias),
LeaderBrokerId = kpro:find(leader_id, P_Meta),
{_, Host} = lists:keyfind(LeaderBrokerId, 1, Brokers),
Strategy = get_connection_strategy(St0),
ConnId = case Strategy of
per_partition -> {TopicOrAlias, PartitionNum};
per_partition -> {Topic, PartitionNum};
per_broker -> Host
end,
Connections =
Expand All @@ -381,19 +415,20 @@ do_ensure_leader_connection(#{conn_config := ConnConfig,
Leaders0 = maps:get(leaders, St0, #{}),
case Strategy of
per_broker ->
Leaders = Leaders0#{{TopicOrAlias, PartitionNum} => maps:get(ConnId, Connections)},
Leaders = Leaders0#{{Topic, PartitionNum} => maps:get(ConnId, Connections)},
St#{leaders => Leaders};
_ ->
St
end.

%% Handle error code in partition metadata.
maybe_disconnect_old_leader(#{conns := Connections} = St, TopicOrAlias, PartitionNum, ErrorCode) ->
Topic = get_topic(TopicOrAlias),
Strategy = get_connection_strategy(St),
case Strategy of
per_partition ->
%% partition metadata has error code, there is no need to keep the old connection alive
ConnId = {TopicOrAlias, PartitionNum},
ConnId = {Topic, PartitionNum},
MaybePid = maps:get(ConnId, Connections, false),
is_pid(MaybePid) andalso close_connection(MaybePid),
St#{conns := add_conn({error, ErrorCode}, ConnId, Connections)};
Expand All @@ -404,7 +439,7 @@ maybe_disconnect_old_leader(#{conns := Connections} = St, TopicOrAlias, Partitio
%% (due to the error code returned) there is no way to know which
%% connection to close anyway.
Leaders0 = maps:get(leaders, St, #{}),
Leaders = Leaders0#{{TopicOrAlias, PartitionNum} => ErrorCode},
Leaders = Leaders0#{{Topic, PartitionNum} => ErrorCode},
St#{leaders => Leaders}
end.

Expand Down Expand Up @@ -450,18 +485,22 @@ split_config(Config) ->
{ConnCfg, MyCfg} = lists:partition(Pred, maps:to_list(Config)),
{maps:from_list(ConnCfg), maps:from_list(MyCfg)}.

-spec get_metadata([_Host], _ConnConfig, topic()) ->
{ok, {pid(), term()}} | {error, term()}.
get_metadata(Hosts, _ConnectFun, _Topic) when Hosts =:= [] ->
{error, no_hosts};
get_metadata(Hosts, ConnectFun, Topic) ->
get_metadata(Hosts, ConnectFun, Topic, []).

get_metadata([], _ConnectFun, _TopicOrAlias, Errors) ->
-spec get_metadata([_Host], _ConnConfig, topic(), [Error]) ->
{ok, {pid(), term()}} | {error, [Error] | term()}.
get_metadata([], _ConnConfig, _Topic, Errors) ->
{error, Errors};
get_metadata([Host | Rest], ConnConfig, TopicOrAlias, Errors) ->
get_metadata([Host | Rest], ConnConfig, Topic, Errors) ->
case do_connect(Host, ConnConfig) of
{ok, Pid} ->
Timeout = maps:get(request_timeout, ConnConfig, ?DEFAULT_METADATA_TIMEOUT),
case do_get_metadata(Pid, TopicOrAlias, Timeout) of
case do_get_metadata(Pid, Topic, Timeout) of
{ok, Result} ->
{ok, {Pid, Result}};
{error, Reason} ->
Expand All @@ -470,20 +509,22 @@ get_metadata([Host | Rest], ConnConfig, TopicOrAlias, Errors) ->
{error, Reason}
end;
{error, Reason} ->
get_metadata(Rest, ConnConfig, TopicOrAlias, [Reason | Errors])
get_metadata(Rest, ConnConfig, Topic, [Reason | Errors])
end.

do_get_metadata(Connection, TopicOrAlias, Timeout) ->
-spec do_get_metadata(connection(), topic(), timeout()) ->
{ok, {_Brokers, _Partitions}} | {error, term()}.
do_get_metadata(Connection, Topic, Timeout) ->
case kpro:get_api_versions(Connection) of
{ok, Vsns} ->
{_, Vsn} = maps:get(metadata, Vsns),
do_get_metadata2(Vsn, Connection, TopicOrAlias, Timeout);
do_get_metadata2(Vsn, Connection, Topic, Timeout);
{error, Reason} ->
{error, Reason}
end.

do_get_metadata2(Vsn, Connection, TopicOrAlias, Timeout) ->
Topic = get_topic(TopicOrAlias),
-spec do_get_metadata2(_Vsn, connection(), topic(), timeout()) -> {ok, {_, _}} | {error, term()}.
do_get_metadata2(Vsn, Connection, Topic, Timeout) ->
Req = kpro_req_lib:metadata(Vsn, [Topic], _IsAutoCreateAllowed = false),
case kpro:request_sync(Connection, Req, Timeout) of
{ok, #kpro_rsp{msg = Meta}} ->
Expand Down
8 changes: 5 additions & 3 deletions src/wolff_producers.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
%% tests
-export([find_producer_by_partition/3]).

-export_type([producers/0, config/0, partitioner/0, producer_alias/0, topic_or_alias/0, alias_and_topic/0]).
-export_type([producers/0, config/0, partitioner/0, producer_alias/0, topic_or_alias/0, alias_and_topic/0, max_partitions/0]).

-include("wolff.hrl").
-include_lib("stdlib/include/ms_transform.hrl").
Expand All @@ -41,7 +41,7 @@
}.

-type producer_alias() :: binary().
-type alias_and_topic() :: {producer_alias(), topic()}.
-type alias_and_topic() :: {producer_alias() | undefined, topic()}.
-type topic() :: kpro:topic().
-type topic_or_alias() :: topic() | alias_and_topic().
-type partition() :: kpro:partition().
Expand All @@ -67,6 +67,8 @@
-define(partition_count_unavailable, -1).
-define(all_partitions, all_partitions).

-type max_partitions() :: pos_integer() | ?all_partitions.

%% @doc Called by wolff_producers_sup to start wolff_producers process.
start_link(ClientId, TopicOrAlias, Config) ->
Name = maps:get(name, Config, <<>>),
Expand Down Expand Up @@ -100,7 +102,7 @@ start_linked_producers(ClientId, ClientPid, TopicOrAlias, ProducerCfg) ->
end,
case wolff_client:get_leader_connections(ClientPid, AliasTopic, MaxPartitions) of
{ok, Connections} ->
Workers = start_link_producers(ClientId, TopicOrAlias, Connections, ProducerCfg),
Workers = start_link_producers(ClientId, AliasTopic, Connections, ProducerCfg),
ok = put_partition_cnt(ClientId, AliasTopic, maps:size(Workers)),
Partitioner = maps:get(partitioner, ProducerCfg, random),
{ok, #{client_id => ClientId,
Expand Down
46 changes: 46 additions & 0 deletions test/wolff_supervised_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,52 @@ test_supervised_producers(Name) ->
wolff_tests:deinstall_event_logging(?FUNCTION_NAME),
ok.

%% Checks that having different producers to the same kafka topic works.
different_producers_same_topic_test() ->
_ = application:stop(wolff), %% ensure stopped
{ok, _} = application:ensure_all_started(wolff),
ClientId = <<"same-topic">>,
ClientCfg = client_config(),
{ok, _ClientPid} = wolff:ensure_supervised_client(ClientId, ?HOSTS, ClientCfg),
Topic = <<"test-topic">>,
ProducerName0 = <<"p0">>,
Alias0 = <<"a0">>,
ProducerCfg0 = (producer_config(ProducerName0))#{required_acks => all_isr, alias => Alias0},
{ok, Producers0} = wolff:ensure_supervised_producers(ClientId, Topic, ProducerCfg0),
?assertEqual({ok, Producers0},
wolff:ensure_supervised_producers(ClientId, <<"test-topic">>, ProducerCfg0)),
ProducerName1 = <<"p1">>,
Alias1 = <<"a1">>,
ProducerCfg1 = (producer_config(ProducerName1))#{required_acks => all_isr, alias => Alias1},
{ok, Producers1} = wolff:ensure_supervised_producers(ClientId, Topic, ProducerCfg1),
?assertEqual({ok, Producers0},
wolff:ensure_supervised_producers(ClientId, <<"test-topic">>, ProducerCfg0)),
%% Each replayq dir should be namespaced by the alias.
#{replayq_dir := BaseDir} = ProducerCfg0,
{ok, Files} = file:list_dir(BaseDir),
ReplayQDirs = [File || File <- Files,
filelib:is_dir(filename:join([BaseDir, File])),
lists:member(list_to_binary(File), [Alias0, Alias1])],
?assertMatch([_, _], ReplayQDirs, #{base_dir => Files}),
%% We can send from each producer.
Msg = #{key => ?KEY, value => <<"value">>},
Self = self(),
AckFun = fun(_Partition, _BaseOffset) -> Self ! acked, ok end,
{_Partition0, _ProducerPid0} = wolff:send(Producers0, [Msg], AckFun),
receive acked -> ok end,
{_Partition1A, _ProducerPid1A} = wolff:send(Producers1, [Msg], AckFun),
receive acked -> ok end,
%% We now stop one of the producers. The other should keep working.
ok = wolff:stop_and_delete_supervised_producers(Producers0),
%% Some time for `wolff_client:delete_producers_metadata' to be processed.
timer:sleep(100),
{_Partition1B, _ProducerPid1B} = wolff:send(Producers1, [Msg], AckFun),
receive acked -> ok end,
ok = wolff:stop_and_delete_supervised_producers(Producers1),
ok = wolff:stop_and_delete_supervised_client(ClientId),
ok = application:stop(wolff),
ok.

client_restart_test() ->
ClientId = <<"client-restart-test">>,
Topic = <<"test-topic">>,
Expand Down

0 comments on commit b7c6cf1

Please sign in to comment.