Skip to content

Commit

Permalink
Merge pull request #492 from kafka4beam/fix/timeout-brod-client
Browse files Browse the repository at this point in the history
safe_gen_call catches more exit exceptions
  • Loading branch information
zmstone authored Feb 6, 2022
2 parents a9c193c + 32d09d6 commit 2253f67
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 23 deletions.
14 changes: 13 additions & 1 deletion changelog.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,19 @@
* 3.16.2
* Update kafka_protocol from 4.0.1 to 4.0.3.
Prior to this change the actual time spent in establishing a
Kafka connection might be longer than desired due to the timeout
being used in SSL upgrade (if enabled), then API version query.
This has been fixed by turning the given timeout config
into a deadline, and the sub-steps will try to meet the deadline.
see more details here: https://github.com/kafka4beam/kafka_protocol/pull/9
* Catch `timeout` and other `DOWN` reasons when making `gen_server` call to
`brod_client`, `brod_consumer` and producer/consumer supervisor,
and return as `Reason` in `{error, Reason}`.
Previously only `noproc` reaon is caught. (#492)
* Propagate `connect_timeout` config to `kpro` API functions as `timeout` arg
affected APIs: connect_group_coordinator, create_topics, delete_topics,
resolve_offset, fetch, fold, fetch_committed_offsets
resolve_offset, fetch, fold, fetch_committed_offsets (#458)
* Fix bad field name in group describe request (#486)
* 3.16.1
* Fix `brod` script in `brod-cli` in release.
* Support `rebalance_timeout` consumer group option
Expand Down
2 changes: 1 addition & 1 deletion rebar.config
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{deps, [ {supervisor3, "1.1.11"}
, {kafka_protocol, "4.0.1"}
, {kafka_protocol, "4.0.3"}
, {snappyer, "1.2.8"}
]}.
{edoc_opts, [{preprocess, true}, {macros, [{build_brod_cli, true}]}]}.
Expand Down
6 changes: 4 additions & 2 deletions src/brod.erl
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,8 @@ get_partitions_count(Client, Topic) ->
-spec get_consumer(client(), topic(), partition()) ->
{ok, pid()} | {error, Reason}
when Reason :: client_down
| {consumer_down, noproc}
| {client_down, any()}
| {consumer_down, any()}
| {consumer_not_found, topic()}
| {consumer_not_found, topic(), partition()}.
get_consumer(Client, Topic, Partition) ->
Expand All @@ -465,7 +466,8 @@ get_consumer(Client, Topic, Partition) ->
-spec get_producer(client(), topic(), partition()) ->
{ok, pid()} | {error, Reason}
when Reason :: client_down
| {producer_down, noproc}
| {client_down, any()}
| {producer_down, any()}
| {producer_not_found, topic()}
| {producer_not_found, topic(), partition()}.
get_producer(Client, Topic, Partition) ->
Expand Down
22 changes: 13 additions & 9 deletions src/brod_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,14 @@
| ?CONSUMER_KEY(topic(), partition()).

-type get_producer_error() :: client_down
| {producer_down, noproc}
| {client_down, any()}
| {producer_down, any()}
| {producer_not_found, topic()}
| { producer_not_found
, topic()
, partition()}.
| {producer_not_found, topic(), partition()}.

-type get_consumer_error() :: client_down
| {consumer_down, noproc}
| {client_down, any()}
| {consumer_down, any()}
| {consumer_not_found, topic()}
| {consumer_not_found, topic(), partition()}.

Expand Down Expand Up @@ -829,16 +829,20 @@ ensure_partition_workers(TopicName, State, F) ->
end
end).

%% Catch noproc exit exception when making gen_server:call.
%% Catches exit exceptions when making gen_server:call.
-spec safe_gen_call(pid() | atom(), Call, Timeout) -> Return
when Call :: term(),
Timeout :: infinity | integer(),
Return :: ok | {ok, term()} | {error, client_down | term()}.
Return :: ok | {ok, term()} | {error, Reason},
Reason :: client_down | {client_down, any()} | any().
safe_gen_call(Server, Call, Timeout) ->
try
gen_server:call(Server, Call, Timeout)
catch exit : {noproc, _} ->
{error, client_down}
catch
exit : {noproc, _} ->
{error, client_down};
exit : {Reason, _} ->
{error, {client_down, Reason}}
end.

-spec kf(kpro:field_name(), kpro:struct()) -> kpro:field_value().
Expand Down
9 changes: 5 additions & 4 deletions src/brod_consumer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -794,16 +794,17 @@ reset_buffer(#state{ pending_acks = #pending_acks{queue = Queue}
, last_req_ref = ?undef
}.

%% Catch noproc exit exception when making gen_server:call.
%% Catch exit exceptions when making gen_server:call.
-spec safe_gen_call(pid() | atom(), Call, Timeout) -> Return
when Call :: term(),
Timeout :: infinity | integer(),
Return :: ok | {ok, term()} | {error, consumer_down | term()}.
Return :: ok | {ok, term()} | {error, any()}.
safe_gen_call(Server, Call, Timeout) ->
try
gen_server:call(Server, Call, Timeout)
catch exit : {noproc, _} ->
{error, consumer_down}
catch
exit : {Reason, _} ->
{error, Reason}
end.

%% Init payload connection regardless of subscriber state.
Expand Down
6 changes: 3 additions & 3 deletions src/brod_consumers_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ stop_consumer(SupPid, TopicName) ->
{ok, pid()} | {error, Reason} when
Reason :: {consumer_not_found, brod:topic()}
| {consumer_not_found, brod:topic(), brod:partition()}
| {consumer_down, noproc}.
| {consumer_down, any()}.
find_consumer(SupPid, Topic, Partition) ->
case supervisor3:find_child(SupPid, Topic) of
[] ->
Expand All @@ -83,8 +83,8 @@ find_consumer(SupPid, Topic, Partition) ->
[Pid] ->
{ok, Pid}
end
catch exit : {noproc, _} ->
{error, {consumer_down, noproc}}
catch exit : {Reason, _} ->
{error, {consumer_down, Reason}}
end
end.

Expand Down
6 changes: 3 additions & 3 deletions src/brod_producers_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ stop_producer(SupPid, TopicName) ->
{ok, pid()} | {error, Reason} when
Reason :: {producer_not_found, brod:topic()}
| {producer_not_found, brod:topic(), brod:partition()}
| {producer_down, noproc}.
| {producer_down, any()}.
find_producer(SupPid, Topic, Partition) ->
case supervisor3:find_child(SupPid, Topic) of
[] ->
Expand All @@ -87,8 +87,8 @@ find_producer(SupPid, Topic, Partition) ->
[Pid] ->
{ok, Pid}
end
catch exit : {noproc, _} ->
{error, {producer_down, noproc}}
catch exit : {Reason, _} ->
{error, {producer_down, Reason}}
end
end.

Expand Down

0 comments on commit 2253f67

Please sign in to comment.