Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: differentiate producers to the same topic with aliases #68

Merged
merged 15 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ ok = wolff:stop_producers(Producers).
ok = wolff:stop_client(Client).
```

If you want to use more than one producer pointing to the same topic, be sure to define an unique `alias` for each one to avoid clashes.

```erlang
Topic = <<"test-topic">>.
{ok, Producers1} = wolff:start_producers(Client, Topic, ProducerCfg#{alias => <<"a1">>}).
{ok, Producers2} = wolff:start_producers(Client, Topic, ProducerCfg#{alias => <<"a2">>}).
```

### Async Produce with Callback

```
Expand Down Expand Up @@ -182,7 +190,7 @@ defining telemetry events. Wolff defines such telemetry events. Users of Wolff
can attach functions to the events, for example, to record when a message has
been successfully sent to Kafka. Wolff's telemetry events are described in the
`wolff_metrics` module. One can read more about how to attach code to the
events in [Beam Telemetry's documentation](https://github.com/beam-telemetry/telemetry).
events in [Beam Telemetry's documentation](https://github.com/beam-telemetry/telemetry).
The third parameter of the Beam Telemetry handler function is a meta data map.
One can send a custom meta data map for each Kafka producer instance by setting
the Kafka producer configuration parameter `telemetry_meta_data` to the map one
Expand Down
3 changes: 3 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
* 2.0.0
- Added the `alias` producer config option to make producers to the same topic be independent.

* 1.10.4 (merge 1.5.14)
- Split batch if `message_too_large` error code is received.
Prior to this fix, `wolff_producer` would retry the same batch indefinitely for any error code received from Kafka (`message_too_large` included).
Expand Down
4 changes: 4 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ services:
zookeeper:
image: wurstmeister/zookeeper
container_name: wolff-zk
ulimits:
nofile:
soft: 65536
hard: 65536
kafka_1:
depends_on:
- zookeeper
Expand Down
5 changes: 5 additions & 0 deletions include/wolff.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,9 @@
%% is inserted to cache the partition count.
-define(WOLFF_PRODUCERS_GLOBAL_TABLE, wolff_producers_global).

-define(ALIASED_TOPIC(ALIAS, TOPIC), {ALIAS, TOPIC}).
-define(NO_ALIAS, no_alias).

-define(all_partitions, all_partitions).

-endif.
2 changes: 1 addition & 1 deletion src/wolff.app.src
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{application, wolff,
[{description, "Kafka's publisher"},
{vsn, "1.10.4"},
{vsn, "2.0.0"},
{registered, []},
{applications,
[kernel,
Expand Down
2 changes: 1 addition & 1 deletion src/wolff.appup.src
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
%% -*- mode: erlang; -*-
{"1.10.4",
{"2.0.0",
[
],
[
Expand Down
17 changes: 3 additions & 14 deletions src/wolff.erl
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

-module(wolff).

-include("wolff.hrl").

%% Supervised client management APIs
-export([ensure_supervised_client/3,
stop_and_delete_supervised_client/1
Expand All @@ -26,10 +28,7 @@

%% Supervised producer management APIs
-export([ensure_supervised_producers/3,
stop_and_delete_supervised_producers/1,
stop_and_delete_supervised_producers/2,
%% /3 is deprecated, call /2 instead
stop_and_delete_supervised_producers/3
stop_and_delete_supervised_producers/1
]).

%% Messaging APIs
Expand All @@ -49,7 +48,6 @@
name/0, offset_reply/0, topic/0]).

-deprecated({check_if_topic_exists, 3}).
-deprecated({stop_and_delete_supervised_producers, 3}).

-type client_id() :: binary().
-type host() :: kpro:endpoint().
Expand Down Expand Up @@ -97,15 +95,6 @@ stop_producers(Producers) ->
ensure_supervised_producers(ClientId, Topic, ProducerCfg) ->
wolff_producers:start_supervised(ClientId, Topic, ProducerCfg).

%% @hidden Deprecated.
-spec stop_and_delete_supervised_producers(client_id(), topic(), name()) -> ok.
stop_and_delete_supervised_producers(ClientId, Topic, _Name) ->
stop_and_delete_supervised_producers(ClientId, Topic).

%% @doc Ensure supervised producers are stopped then deleted.
stop_and_delete_supervised_producers(ClientId, Topic) ->
wolff_producers:stop_supervised(ClientId, Topic).

%% @doc Ensure supervised producers are stopped then deleted.
-spec stop_and_delete_supervised_producers(wolff_producers:producers()) -> ok.
stop_and_delete_supervised_producers(Producers) ->
Expand Down
Loading
Loading