Skip to content

Commit

Permalink
fix(wolff_client): keep metadata connection alive
Browse files Browse the repository at this point in the history
  • Loading branch information
zmstone committed Feb 7, 2024
1 parent 12291a2 commit d56fe6a
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 34 deletions.
5 changes: 5 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
* 1.5.13
- Use long-lived metadata connection.
This is to avoid having to excessively re-establish connection when there are many concurrent connectivity checks.
* 1.5.12
- Fix connection error reason translation, the error log is now more compact when e.g. connect timeout happens.
* 1.5.11
- Fixed a try catch pattern in `gen_server` call towards client process, this should prevent `wolff_producers` from crash if `wolff_client` is killed during initialization.
* 1.5.10
Expand Down
2 changes: 1 addition & 1 deletion src/wolff.app.src
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{application, wolff,
[{description, "Kafka's publisher"},
{vsn, "1.5.12"},
{vsn, "1.5.13"},
{registered, []},
{applications,
[kernel,
Expand Down
6 changes: 3 additions & 3 deletions src/wolff.appup.src
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{"1.5.12",
{"1.5.13",
[
{"1.5.11",
{<<"1\\.5\\.1[1-2]">>,
[ {load_module, wolff_client, brutal_purge, soft_purge, []}
]},
{"1.5.10",
Expand Down Expand Up @@ -37,7 +37,7 @@
}
],
[
{"1.5.11",
{<<"1\\.5\\.1[1-2]">>,
[ {load_module, wolff_client, brutal_purge, soft_purge, []}
]},
{"1.5.10",
Expand Down
102 changes: 72 additions & 30 deletions src/wolff_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
config := config(),
conn_config := kpro:conn_config(),
conns := #{conn_id() => connection()},
metadata_conn := pid() | not_initialized | down,
metadata_ts := #{topic() => erlang:timestamp()},
%% only applicable when connection strategy is per_broker
%% because in this case the connections are keyed by host()
Expand Down Expand Up @@ -70,6 +71,7 @@ start_link(ClientId, Hosts, Config) ->
config => MyCfg,
conn_config => ConnCfg,
conns => #{},
metadata_conn => not_initialized,
metadata_ts => #{},
leaders => #{}
},
Expand All @@ -89,19 +91,21 @@ get_id(Pid) ->
get_leader_connections(Client, Topic) ->
safe_call(Client, {get_leader_connections, Topic}).

%% @doc Check if client has a metadata connection alive.
%% Trigger a reconnect if the connection is down for whatever reason.
-spec check_connectivity(pid()) -> ok | {error, any()}.
check_connectivity(Pid) ->
safe_call(Pid, check_connectivity).

%% @doc Connect to any host in the list and immediately disconnect.
-spec check_connectivity([host()], kpro:conn_config()) -> ok | {error, any()}.
check_connectivity(Hosts, ConnConfig) ->
case kpro:connect_any(Hosts, ConnConfig) of
{ok, Conn} ->
close_connection(Conn),
ok;
{error, Reasons} ->
{error, tr_reasons(Reasons)}
end.
case kpro:connect_any(Hosts, ConnConfig) of
{ok, Conn} ->
ok = close_connection(Conn);
{error, Reasons} ->
{error, tr_reasons(Reasons)}
end.

safe_call(Pid, Call) ->
try gen_server:call(Pid, Call, infinity)
Expand Down Expand Up @@ -135,9 +139,21 @@ handle_call(stop, From, #{conns := Conns} = St) ->
ok = close_connections(Conns),
gen_server:reply(From, ok),
{stop, normal, St#{conns := #{}}};
handle_call(check_connectivity, _From, #{seed_hosts := Hosts, conn_config := ConnConfig} = St) ->
Res = check_connectivity(Hosts, ConnConfig),
{reply, Res, St};
handle_call(check_connectivity, _From,
#{seed_hosts := Hosts,
conn_config := ConnConfig} = St) ->
Pid = maps:get(metadata_conn, St, none),
case is_pid(Pid) andalso erlang:is_process_alive(Pid) of
true ->
{reply, ok, St};
false ->
case kpro:connect_any(Hosts, ConnConfig) of
{ok, NewPid} ->
{reply, ok, St#{metadata_conn := NewPid}};
{error, Reasons} ->
{reply, {error, tr_reasons(Reasons)}, St}
end
end;
handle_call(_Call, _From, St) ->
{noreply, St}.

Expand Down Expand Up @@ -171,7 +187,9 @@ code_change(_OldVsn, St, _Extra) ->
{ok, St}.

terminate(_, #{conns := Conns} = St) ->
MetadataConn = maps:get(metadata_conn, St, none),
ok = close_connections(Conns),
ok = close_connection(MetadataConn),
{ok, St#{conns := #{}}}.

%% == internals ======================================================
Expand Down Expand Up @@ -228,24 +246,37 @@ is_metadata_fresh(#{metadata_ts := Topics, config := Config}, Topic) ->
ensure_leader_connections(St, Topic) ->
case is_metadata_fresh(St, Topic) of
true -> {ok, St};
false -> do_ensure_leader_connections(St, Topic)
false -> ensure_leader_connections2(St, Topic)
end.

do_ensure_leader_connections(#{conn_config := ConnConfig,
seed_hosts := SeedHosts,
metadata_ts := MetadataTs
} = St0, Topic) ->
case get_metadata(SeedHosts, ConnConfig, Topic, []) of
ensure_leader_connections2(#{metadata_conn := Pid} = St, Topic) when is_pid(Pid) ->
case do_get_metadata(Pid, Topic) of
{ok, {Brokers, PartitionMetaList}} ->
St = lists:foldl(fun(PartitionMeta, StIn) ->
ensure_leader_connection(StIn, Brokers, Topic, PartitionMeta)
end, St0, PartitionMetaList),
{ok, St#{metadata_ts := MetadataTs#{Topic => erlang:timestamp()}}};
ensure_leader_connections3(St, Topic, Pid, Brokers, PartitionMetaList);
{error, _Reason} ->
%% metadata connection is down, try to establish a new one
%% error is discarded here, will log error if the immediate retry (next clause) fails
ensure_leader_connections2(St#{metadata_conn := down}, Topic)
end;
ensure_leader_connections2(#{conn_config := ConnConfig,
seed_hosts := SeedHosts} = St, Topic) ->
case get_metadata(SeedHosts, ConnConfig, Topic, []) of
{ok, {ConnPid, {Brokers, PartitionMetaList}}} ->
ensure_leader_connections3(St, Topic, ConnPid, Brokers, PartitionMetaList);
{error, Reason} ->
log_warn("Failed to get metadata\nreason: ~p", [Reason]),
{error, failed_to_fetch_metadata}
end.

ensure_leader_connections3(#{metadata_ts := MetadataTs} = St0, Topic,
ConnPid, Brokers, PartitionMetaList) ->
St = lists:foldl(fun(PartitionMeta, StIn) ->
ensure_leader_connection(StIn, Brokers, Topic, PartitionMeta)
end, St0, PartitionMetaList),
{ok, St#{metadata_ts := MetadataTs#{Topic => erlang:timestamp()},
metadata_conn := ConnPid
}}.

%% This function ensures each Topic-Partition pair has a connection record
%% either a pid when the leader is healthy, or the error reason
%% if failed to discover the leader or failed to connect to the leader
Expand Down Expand Up @@ -354,23 +385,32 @@ split_config(Config) ->
{maps:from_list(ConnCfg), maps:from_list(MyCfg)}.

get_metadata([], _ConnectFun, _Topic, Errors) ->
%% failed to connect to ALL seed hosts, crash instead of return {error, Reason}
{error, Errors};
get_metadata([Host | Rest], ConnConfig, Topic, Errors) ->
case do_connect(Host, ConnConfig) of
{ok, Pid} ->
try
{ok, Vsns} = kpro:get_api_versions(Pid),
{_, Vsn} = maps:get(metadata, Vsns),
do_get_metadata(Vsn, Pid, Topic)
after
_ = close_connection(Pid)
case do_get_metadata(Pid, Topic) of
{ok, Result} ->
{ok, {Pid, Result}};
{error, Reason} ->
%% failed to fetch metadata, make sure this connection is closed
ok = close_connection(Pid),
{error, Reason}
end;
{error, Reason} ->
get_metadata(Rest, ConnConfig, Topic, [Reason | Errors])
end.

do_get_metadata(Vsn, Connection, Topic) ->
do_get_metadata(Connection, Topic) ->
case kpro:get_api_versions(Connection) of
{ok, Vsns} ->
{_, Vsn} = maps:get(metadata, Vsns),
do_get_metadata2(Vsn, Connection, Topic);
{error, Reason} ->
{error, Reason}
end.

do_get_metadata2(Vsn, Connection, Topic) ->
Req = kpro:make_request(metadata, Vsn, [{topics, [Topic]},
{allow_auto_topic_creation, false}]),
case kpro:request_sync(Connection, Req, ?DEFAULT_METADATA_TIMEOUT) of
Expand All @@ -381,8 +421,10 @@ do_get_metadata(Vsn, Connection, Topic) ->
ErrorCode = kpro:find(error_code, TopicMeta),
Partitions = kpro:find(partition_metadata, TopicMeta),
case ErrorCode =:= ?no_error of
true -> {ok, {Brokers, Partitions}};
false -> {error, ErrorCode} %% no such topic ?
true ->
{ok, {Brokers, Partitions}};
false ->
{error, ErrorCode} %% no such topic ?
end;
{error, Reason} ->
{error, Reason}
Expand Down
18 changes: 18 additions & 0 deletions test/wolff_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,24 @@ ack_cb(Partition, Offset, Self, Ref) ->
Self ! {ack, Ref, Partition, Offset},
ok.

metadata_connection_restart_test() ->
ClientCfg = client_config(),
ClientId = <<"client-1">>,
{ok, Client} = start_client(ClientId, ?HOSTS, ClientCfg),
GetMetadataConn = fun() ->
ok = wolff:check_connectivity(ClientId),
ok = wolff_client:check_connectivity(Client),
State = sys:get_state(Client),
Pid = maps:get(metadata_conn, State),
?assert(is_process_alive(Pid)),
Pid
end,
Pid1 = GetMetadataConn(),
exit(Pid1, kill),
Pid2 = GetMetadataConn(),
ok = stop_client(Client),
?assertNot(is_process_alive(Pid2)).

send_test() ->
ClientCfg = client_config(),
{ok, Client} = start_client(<<"client-1">>, ?HOSTS, ClientCfg),
Expand Down

0 comments on commit d56fe6a

Please sign in to comment.