Skip to content

Commit

Permalink
fix(wolff_client): respect request_timeout for metdata request
Browse files Browse the repository at this point in the history
  • Loading branch information
zmstone committed Nov 6, 2023
1 parent c9c5b6b commit ee45341
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 4 deletions.
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,9 @@ wolff:send(Producers, [Msg], AckFun).
a delay before trying to reconnect.

* Connection level configs are merged into `wolff` client config, including:
`iconnect_timeout`, `client_id`, `extra_sock_opts`, `query_api_versions`,
`request_timeout`, `sasl` and `ssl`. Ref: [kpro_connection.erl](https://github.com/klarna/kafka_protocol/blob/master/src/kpro_connection.erl)
`connect_timeout`, `client_id`, `extra_sock_opts`, `query_api_versions`,
`request_timeout`, `sasl` and `ssl`.
Ref: [kpro_connection.erl](https://github.com/klarna/kafka_protocol/blob/master/src/kpro_connection.erl)

## Producer Config

Expand Down
1 change: 1 addition & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
Since 1.7.0, there there is a better integration for metrics.
- For supervised producers, use a global ets table (named `wolff_producers_global`) to store producer workers.
This should avoid having to create an atom for each supervised topic producer.
- Respect `request_timeout` in connection config when fetching metadta.

* 1.8.0
- Add wolff:check_if_topic_exists/2 for checking if a topic exists making use of an existing client process. [#52](https://github.com/kafka4beam/wolff/pull/52)
Expand Down
8 changes: 6 additions & 2 deletions src/wolff_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,8 @@ get_metadata([Host | Rest], ConnConfig, Topic, Errors) ->
try
{ok, Vsns} = kpro:get_api_versions(Pid),
{_, Vsn} = maps:get(metadata, Vsns),
do_get_metadata(Vsn, Pid, Topic)
Timeout = maps:get(request_timeout, ConnConfig, ?DEFAULT_METADATA_TIMEOUT),
do_get_metadata(Vsn, Pid, Topic, Timeout)
after
_ = close_connection(Pid)
end;
Expand All @@ -409,8 +410,11 @@ get_metadata([Host | Rest], ConnConfig, Topic, Errors) ->
end.

do_get_metadata(Vsn, Connection, Topic) ->
do_get_metadata(Vsn, Connection, Topic, ?DEFAULT_METADATA_TIMEOUT).

do_get_metadata(Vsn, Connection, Topic, Timeout) ->
Req = kpro_req_lib:metadata(Vsn, [Topic], _IsAutoCreateAllowed = false),
case kpro:request_sync(Connection, Req, ?DEFAULT_METADATA_TIMEOUT) of
case kpro:request_sync(Connection, Req, Timeout) of
{ok, #kpro_rsp{msg = Meta}} ->
BrokersMeta = kpro:find(brokers, Meta),
Brokers = [parse_broker_meta(M) || M <- BrokersMeta],
Expand Down

0 comments on commit ee45341

Please sign in to comment.