Skip to content

Commit

Permalink
fix: differentiate producers to the same topic with aliases
Browse files Browse the repository at this point in the history
  • Loading branch information
thalesmg committed Jun 24, 2024
1 parent a5cbf24 commit 636a005
Show file tree
Hide file tree
Showing 8 changed files with 184 additions and 127 deletions.
3 changes: 3 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
* 1.10.5
- Added the `alias` producer config option to make producers to the same topic be independent.

* 1.10.4 (merge 1.5.14)
- Split batch if `message_too_large` error code is received.
Prior to this fix, `wolff_producer` would retry the same batch indefinitely for any error code received from Kafka (`message_too_large` included).
Expand Down
4 changes: 4 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ services:
zookeeper:
image: wurstmeister/zookeeper
container_name: wolff-zk
ulimits:
nofile:
soft: 65536
hard: 65536
kafka_1:
depends_on:
- zookeeper
Expand Down
2 changes: 1 addition & 1 deletion src/wolff.app.src
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{application, wolff,
[{description, "Kafka's publisher"},
{vsn, "1.10.4"},
{vsn, "1.10.5"},
{registered, []},
{applications,
[kernel,
Expand Down
2 changes: 1 addition & 1 deletion src/wolff.appup.src
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
%% -*- mode: erlang; -*-
{"1.10.4",
{"1.10.5",
[
],
[
Expand Down
2 changes: 1 addition & 1 deletion src/wolff.erl
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ ensure_supervised_producers(ClientId, Topic, ProducerCfg) ->
%% @hidden Deprecated.
-spec stop_and_delete_supervised_producers(client_id(), topic(), name()) -> ok.
stop_and_delete_supervised_producers(ClientId, Topic, _Name) ->
stop_and_delete_supervised_producers(ClientId, Topic).
stop_and_delete_supervised_producers(ClientId, {_Alias = undefined, Topic}).

%% @doc Ensure supervised producers are stopped then deleted.
stop_and_delete_supervised_producers(ClientId, Topic) ->
Expand Down
117 changes: 65 additions & 52 deletions src/wolff_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@

-type config() :: map().
-type topic() :: kpro:topic().
-type alias_and_topic() :: wolff_producers:alias_and_topic().
-type topic_or_alias() :: topic() | alias_and_topic().
-type partition() :: kpro:partition().
-type connection() :: kpro:connection().
-type host() :: wolff:host().
-type conn_id() :: {topic(), partition()} | host().
-type conn_id() :: {topic_or_alias(), partition()} | host().

-type state() ::
#{client_id := wolff:client_id(),
Expand All @@ -49,11 +51,11 @@
conn_config := kpro:conn_config(),
conns := #{conn_id() => connection()},
metadata_conn := pid() | not_initialized | down,
metadata_ts := #{topic() => erlang:timestamp()},
metadata_ts := #{topic_or_alias() => erlang:timestamp()},
%% only applicable when connection strategy is per_broker
%% because in this case the connections are keyed by host()
%% but we need to find connection by {topic(), partition()}
leaders => #{{topic(), partition()} => connection()}
leaders => #{{topic_or_alias(), partition()} => connection()}
}.

-define(DEFAULT_METADATA_TIMEOUT, 10000).
Expand Down Expand Up @@ -94,15 +96,17 @@ stop(Pid) ->
get_id(Pid) ->
gen_server:call(Pid, get_id, infinity).

-spec get_leader_connections(pid(), topic()) ->
-spec get_leader_connections(pid(), topic_or_alias()) ->
{ok, [{partition(), pid() | ?conn_down(_)}]} | {error, any()}.
get_leader_connections(Client, Topic) ->
safe_call(Client, {get_leader_connections, Topic, all_partitions}).
get_leader_connections(Client, TopicOrAlias0) ->
TopicOrAlias = ensure_has_alias(TopicOrAlias0),
safe_call(Client, {get_leader_connections, TopicOrAlias, all_partitions}).

-spec get_leader_connections(pid(), topic(), all_partitions | pos_integer()) ->
-spec get_leader_connections(pid(), topic_or_alias(), all_partitions | pos_integer()) ->
{ok, [{partition(), pid() | ?conn_down(_)}]} | {error, any()}.
get_leader_connections(Client, Topic, MaxPartitions) ->
safe_call(Client, {get_leader_connections, Topic, MaxPartitions}).
get_leader_connections(Client, TopicOrAlias0, MaxPartitions) ->
TopicOrAlias = ensure_has_alias(TopicOrAlias0),
safe_call(Client, {get_leader_connections, TopicOrAlias, MaxPartitions}).

%% @doc Check if client has a metadata connection alive.
%% Trigger a reconnect if the connection is down for whatever reason.
Expand Down Expand Up @@ -147,8 +151,8 @@ safe_call(Pid, Call) ->
recv_leader_connection(Client, Topic, Partition, Pid, MaxPartitions) ->
gen_server:cast(Client, {recv_leader_connection, Topic, Partition, Pid, MaxPartitions}).

delete_producers_metadata(Client, Topic) ->
gen_server:cast(Client, {delete_producers_metadata, Topic}).
delete_producers_metadata(Client, TopicOrAlias) ->
gen_server:cast(Client, {delete_producers_metadata, TopicOrAlias}).

init(#{client_id := ClientID} = St) ->
erlang:process_flag(trap_exit, true),
Expand All @@ -167,10 +171,10 @@ handle_call({check_if_topic_exists, Topic}, _From, #{conn_config := ConnConfig}
{error, Reason} ->
{reply, {error, Reason}, St0}
end;
handle_call({get_leader_connections, Topic, MaxPartitions}, _From, St0) ->
case ensure_leader_connections(St0, Topic, MaxPartitions) of
handle_call({get_leader_connections, TopicOrAlias, MaxPartitions}, _From, St0) ->
case ensure_leader_connections(St0, TopicOrAlias, MaxPartitions) of
{ok, St} ->
Result = do_get_leader_connections(St, Topic),
Result = do_get_leader_connections(St, TopicOrAlias),
{reply, {ok, Result}, St};
{error, Reason} ->
{reply, {error, Reason}, St0}
Expand Down Expand Up @@ -208,9 +212,9 @@ handle_cast({recv_leader_connection, Topic, Partition, Caller, MaxConnections},
{noreply, St0}
end;

handle_cast({delete_producers_metadata, Topic}, #{metadata_ts := Topics, conns := Conns} = St) ->
Conns1 = maps:without( [K || K = {K1, _} <- maps:keys(Conns), K1 =:= Topic ], Conns),
{noreply, St#{metadata_ts => maps:remove(Topic, Topics), conns => Conns1}};
handle_cast({delete_producers_metadata, TopicOrAlias}, #{metadata_ts := Topics, conns := Conns} = St) ->
Conns1 = maps:without( [K || K = {K1, _} <- maps:keys(Conns), K1 =:= TopicOrAlias ], Conns),
{noreply, St#{metadata_ts => maps:remove(TopicOrAlias, Topics), conns => Conns1}};

handle_cast(_Cast, St) ->
{noreply, St}.
Expand Down Expand Up @@ -269,12 +273,12 @@ do_close_connection(Pid) ->
exit(Pid, kill)
end.

do_get_leader_connections(#{conns := Conns} = St, Topic) ->
do_get_leader_connections(#{conns := Conns} = St, TopicOrAlias) ->
FindInMap = case get_connection_strategy(St) of
per_partition -> Conns;
per_broker -> maps:get(leaders, St)
end,
F = fun({T, P}, MaybePid, Acc) when T =:= Topic ->
F = fun({T, P}, MaybePid, Acc) when T =:= TopicOrAlias ->
case is_alive(MaybePid) of
true ->
[{P, MaybePid} | Acc];
Expand All @@ -288,49 +292,49 @@ do_get_leader_connections(#{conns := Conns} = St, Topic) ->
maps:fold(F, [], FindInMap).

%% return true if there is no need to refresh metadata because the last one is fresh enough
is_metadata_fresh(#{metadata_ts := Topics, config := Config}, Topic) ->
is_metadata_fresh(#{metadata_ts := Topics, config := Config}, TopicOrAlias) ->
MinInterval = maps:get(min_metadata_refresh_interval, Config, ?MIN_METADATA_REFRESH_INTERVAL),
case maps:get(Topic, Topics, false) of
case maps:get(TopicOrAlias, Topics, false) of
false -> false;
Ts -> timer:now_diff(erlang:timestamp(), Ts) < MinInterval * 1000
end.

-spec ensure_leader_connections(state(), topic(), all_partitions | pos_integer()) ->
-spec ensure_leader_connections(state(), topic_or_alias(), all_partitions | pos_integer()) ->
{ok, state()} | {error, any()}.
ensure_leader_connections(St, Topic, MaxPartitions) ->
case is_metadata_fresh(St, Topic) of
ensure_leader_connections(St, TopicOrAlias, MaxPartitions) ->
case is_metadata_fresh(St, TopicOrAlias) of
true -> {ok, St};
false -> ensure_leader_connections2(St, Topic, MaxPartitions)
false -> ensure_leader_connections2(St, TopicOrAlias, MaxPartitions)
end.

ensure_leader_connections2(#{metadata_conn := Pid, conn_config := ConnConfig} = St, Topic, MaxPartitions) when is_pid(Pid) ->
ensure_leader_connections2(#{metadata_conn := Pid, conn_config := ConnConfig} = St, TopicOrAlias, MaxPartitions) when is_pid(Pid) ->
Timeout = maps:get(request_timeout, ConnConfig, ?DEFAULT_METADATA_TIMEOUT),
case do_get_metadata(Pid, Topic, Timeout) of
case do_get_metadata(Pid, TopicOrAlias, Timeout) of
{ok, {Brokers, PartitionMetaList}} ->
ensure_leader_connections3(St, Topic, Pid, Brokers, PartitionMetaList, MaxPartitions);
ensure_leader_connections3(St, TopicOrAlias, Pid, Brokers, PartitionMetaList, MaxPartitions);
{error, _Reason} ->
%% ensure metadata connection is down, try to establish a new one in the next clause,
%% reason is discarded here, because the next clause will log error if the immediate retry fails
exit(Pid, kill),
ensure_leader_connections2(St#{metadata_conn => down}, Topic, MaxPartitions)
ensure_leader_connections2(St#{metadata_conn => down}, TopicOrAlias, MaxPartitions)
end;
ensure_leader_connections2(#{conn_config := ConnConfig,
seed_hosts := SeedHosts} = St, Topic, MaxPartitions) ->
case get_metadata(SeedHosts, ConnConfig, Topic, []) of
seed_hosts := SeedHosts} = St, TopicOrAlias, MaxPartitions) ->
case get_metadata(SeedHosts, ConnConfig, TopicOrAlias, []) of
{ok, {ConnPid, {Brokers, PartitionMetaList}}} ->
ensure_leader_connections3(St, Topic, ConnPid, Brokers, PartitionMetaList, MaxPartitions);
ensure_leader_connections3(St, TopicOrAlias, ConnPid, Brokers, PartitionMetaList, MaxPartitions);
{error, Errors} ->
log_warn(failed_to_fetch_metadata, #{topic => Topic, errors => Errors}),
log_warn(failed_to_fetch_metadata, #{topic => get_topic(TopicOrAlias), errors => Errors}),
{error, failed_to_fetch_metadata}
end.

ensure_leader_connections3(#{metadata_ts := MetadataTs} = St0, Topic,
ensure_leader_connections3(#{metadata_ts := MetadataTs} = St0, TopicOrAlias,
ConnPid, Brokers, PartitionMetaList0, MaxPartitions) ->
PartitionMetaList = limit_partitions_count(PartitionMetaList0, MaxPartitions),
St = lists:foldl(fun(PartitionMeta, StIn) ->
ensure_leader_connection(StIn, Brokers, Topic, PartitionMeta)
ensure_leader_connection(StIn, Brokers, TopicOrAlias, PartitionMeta)
end, St0, PartitionMetaList),
{ok, St#{metadata_ts := MetadataTs#{Topic => erlang:timestamp()},
{ok, St#{metadata_ts := MetadataTs#{TopicOrAlias => erlang:timestamp()},
metadata_conn => ConnPid
}}.

Expand All @@ -342,24 +346,24 @@ limit_partitions_count(PartitionMetaList, _) ->
%% This function ensures each Topic-Partition pair has a connection record
%% either a pid when the leader is healthy, or the error reason
%% if failed to discover the leader or failed to connect to the leader
ensure_leader_connection(St, Brokers, Topic, P_Meta) ->
ensure_leader_connection(St, Brokers, TopicOrAlias, P_Meta) ->
PartitionNum = kpro:find(partition_index, P_Meta),
ErrorCode = kpro:find(error_code, P_Meta),
case ErrorCode =:= ?no_error of
true ->
do_ensure_leader_connection(St, Brokers, Topic, PartitionNum, P_Meta);
do_ensure_leader_connection(St, Brokers, TopicOrAlias, PartitionNum, P_Meta);
false ->
maybe_disconnect_old_leader(St, Topic, PartitionNum, ErrorCode)
maybe_disconnect_old_leader(St, TopicOrAlias, PartitionNum, ErrorCode)
end.

do_ensure_leader_connection(#{conn_config := ConnConfig,
conns := Connections0
} = St0, Brokers, Topic, PartitionNum, P_Meta) ->
} = St0, Brokers, TopicOrAlias, PartitionNum, P_Meta) ->
LeaderBrokerId = kpro:find(leader_id, P_Meta),
{_, Host} = lists:keyfind(LeaderBrokerId, 1, Brokers),
Strategy = get_connection_strategy(St0),
ConnId = case Strategy of
per_partition -> {Topic, PartitionNum};
per_partition -> {TopicOrAlias, PartitionNum};
per_broker -> Host
end,
Connections =
Expand All @@ -377,19 +381,19 @@ do_ensure_leader_connection(#{conn_config := ConnConfig,
Leaders0 = maps:get(leaders, St0, #{}),
case Strategy of
per_broker ->
Leaders = Leaders0#{{Topic, PartitionNum} => maps:get(ConnId, Connections)},
Leaders = Leaders0#{{TopicOrAlias, PartitionNum} => maps:get(ConnId, Connections)},
St#{leaders => Leaders};
_ ->
St
end.

%% Handle error code in partition metadata.
maybe_disconnect_old_leader(#{conns := Connections} = St, Topic, PartitionNum, ErrorCode) ->
maybe_disconnect_old_leader(#{conns := Connections} = St, TopicOrAlias, PartitionNum, ErrorCode) ->
Strategy = get_connection_strategy(St),
case Strategy of
per_partition ->
%% partition metadata has error code, there is no need to keep the old connection alive
ConnId = {Topic, PartitionNum},
ConnId = {TopicOrAlias, PartitionNum},
MaybePid = maps:get(ConnId, Connections, false),
is_pid(MaybePid) andalso close_connection(MaybePid),
St#{conns := add_conn({error, ErrorCode}, ConnId, Connections)};
Expand All @@ -400,7 +404,7 @@ maybe_disconnect_old_leader(#{conns := Connections} = St, Topic, PartitionNum, E
%% (due to the error code returned) there is no way to know which
%% connection to close anyway.
Leaders0 = maps:get(leaders, St, #{}),
Leaders = Leaders0#{{Topic, PartitionNum} => ErrorCode},
Leaders = Leaders0#{{TopicOrAlias, PartitionNum} => ErrorCode},
St#{leaders => Leaders}
end.

Expand Down Expand Up @@ -451,13 +455,13 @@ get_metadata(Hosts, _ConnectFun, _Topic) when Hosts =:= [] ->
get_metadata(Hosts, ConnectFun, Topic) ->
get_metadata(Hosts, ConnectFun, Topic, []).

get_metadata([], _ConnectFun, _Topic, Errors) ->
get_metadata([], _ConnectFun, _TopicOrAlias, Errors) ->
{error, Errors};
get_metadata([Host | Rest], ConnConfig, Topic, Errors) ->
get_metadata([Host | Rest], ConnConfig, TopicOrAlias, Errors) ->
case do_connect(Host, ConnConfig) of
{ok, Pid} ->
Timeout = maps:get(request_timeout, ConnConfig, ?DEFAULT_METADATA_TIMEOUT),
case do_get_metadata(Pid, Topic, Timeout) of
case do_get_metadata(Pid, TopicOrAlias, Timeout) of
{ok, Result} ->
{ok, {Pid, Result}};
{error, Reason} ->
Expand All @@ -466,19 +470,20 @@ get_metadata([Host | Rest], ConnConfig, Topic, Errors) ->
{error, Reason}
end;
{error, Reason} ->
get_metadata(Rest, ConnConfig, Topic, [Reason | Errors])
get_metadata(Rest, ConnConfig, TopicOrAlias, [Reason | Errors])
end.

do_get_metadata(Connection, Topic, Timeout) ->
do_get_metadata(Connection, TopicOrAlias, Timeout) ->
case kpro:get_api_versions(Connection) of
{ok, Vsns} ->
{_, Vsn} = maps:get(metadata, Vsns),
do_get_metadata2(Vsn, Connection, Topic, Timeout);
do_get_metadata2(Vsn, Connection, TopicOrAlias, Timeout);
{error, Reason} ->
{error, Reason}
end.

do_get_metadata2(Vsn, Connection, Topic, Timeout) ->
do_get_metadata2(Vsn, Connection, TopicOrAlias, Timeout) ->
Topic = get_topic(TopicOrAlias),
Req = kpro_req_lib:metadata(Vsn, [Topic], _IsAutoCreateAllowed = false),
case kpro:request_sync(Connection, Req, Timeout) of
{ok, #kpro_rsp{msg = Meta}} ->
Expand Down Expand Up @@ -561,3 +566,11 @@ bin(X) ->
{error, _} -> bin(io_lib:format("~0p", [X]));
Addr -> bin(Addr)
end.

-spec get_topic(topic_or_alias()) -> topic().
get_topic({_Alias, Topic}) -> Topic;
get_topic(Topic) -> Topic.

-spec ensure_has_alias(topic_or_alias()) -> alias_and_topic().
ensure_has_alias({Alias, Topic}) -> {Alias, Topic};
ensure_has_alias(Topic) -> {undefined, Topic}.
Loading

0 comments on commit 636a005

Please sign in to comment.