diff --git a/README.md b/README.md index 57a012b..1f562fa 100644 --- a/README.md +++ b/README.md @@ -126,8 +126,9 @@ wolff:send(Producers, [Msg], AckFun). a delay before trying to reconnect. * Connection level configs are merged into `wolff` client config, including: - `iconnect_timeout`, `client_id`, `extra_sock_opts`, `query_api_versions`, - `request_timeout`, `sasl` and `ssl`. Ref: [kpro_connection.erl](https://github.com/klarna/kafka_protocol/blob/master/src/kpro_connection.erl) + `connect_timeout`, `client_id`, `extra_sock_opts`, `query_api_versions`, + `request_timeout`, `sasl` and `ssl`. + Ref: [kpro_connection.erl](https://github.com/klarna/kafka_protocol/blob/master/src/kpro_connection.erl) ## Producer Config diff --git a/changelog.md b/changelog.md index a88c051..901d81d 100644 --- a/changelog.md +++ b/changelog.md @@ -4,6 +4,7 @@ Since 1.7.0, there there is a better integration for metrics. - For supervised producers, use a global ets table (named `wolff_producers_global`) to store producer workers. This should avoid having to create an atom for each supervised topic producer. + - Respect `request_timeout` in connection config when fetching metadta. * 1.8.0 - Add wolff:check_if_topic_exists/2 for checking if a topic exists making use of an existing client process. [#52](https://github.com/kafka4beam/wolff/pull/52) diff --git a/src/wolff_client.erl b/src/wolff_client.erl index dda8143..ae35bc0 100644 --- a/src/wolff_client.erl +++ b/src/wolff_client.erl @@ -400,7 +400,8 @@ get_metadata([Host | Rest], ConnConfig, Topic, Errors) -> try {ok, Vsns} = kpro:get_api_versions(Pid), {_, Vsn} = maps:get(metadata, Vsns), - do_get_metadata(Vsn, Pid, Topic) + Timeout = maps:get(request_timeout, ConnConfig, ?DEFAULT_METADATA_TIMEOUT), + do_get_metadata(Vsn, Pid, Topic, Timeout) after _ = close_connection(Pid) end; @@ -409,8 +410,11 @@ get_metadata([Host | Rest], ConnConfig, Topic, Errors) -> end. do_get_metadata(Vsn, Connection, Topic) -> + do_get_metadata(Vsn, Connection, Topic, ?DEFAULT_METADATA_TIMEOUT). + +do_get_metadata(Vsn, Connection, Topic, Timeout) -> Req = kpro_req_lib:metadata(Vsn, [Topic], _IsAutoCreateAllowed = false), - case kpro:request_sync(Connection, Req, ?DEFAULT_METADATA_TIMEOUT) of + case kpro:request_sync(Connection, Req, Timeout) of {ok, #kpro_rsp{msg = Meta}} -> BrokersMeta = kpro:find(brokers, Meta), Brokers = [parse_broker_meta(M) || M <- BrokersMeta],