From 53eb831ca4abcc0f2de6a8a9686bd13fc4767ca6 Mon Sep 17 00:00:00 2001 From: Jeff Grunewald Date: Sun, 21 Nov 2021 18:56:43 -0500 Subject: [PATCH] updating to brod 3.16 and new kafka resp schemas (#100) * updating to brod 3.16 and new kafka resp schemas * bumping minimum supported elixir version --- .github/workflows/main.yml | 12 +++++----- .github/workflows/master.yml | 4 ++-- .github/workflows/release.yml | 4 ++-- .tool-versions | 2 ++ lib/elsa/partitioner/random.ex | 2 +- lib/elsa/topic.ex | 27 ++++++++++++---------- lib/elsa/util.ex | 24 +++++++++++--------- mix.exs | 6 ++--- mix.lock | 17 +++++++------- test/integration/elsa/consumer_test.exs | 4 ++-- test/unit/elsa/topic_test.exs | 30 ++++++++++++------------- 11 files changed, 70 insertions(+), 62 deletions(-) create mode 100644 .tool-versions diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index a8265ca..44bf74e 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -11,8 +11,8 @@ jobs: - uses: actions/checkout@v2 - uses: erlef/setup-beam@v1 with: - otp-version: 21.3 - elixir-version: 1.8.2 + otp-version: 22.3 + elixir-version: 1.10.4 - name: Get depedencies run: | mix local.rebar --force @@ -28,8 +28,8 @@ jobs: - uses: actions/checkout@v2 - uses: erlef/setup-beam@v1 with: - otp-version: 21.3 - elixir-version: 1.8.2 + otp-version: 22.3 + elixir-version: 1.10.4 - name: Get dependencies run: | mix local.rebar --force @@ -45,8 +45,8 @@ jobs: - uses: actions/checkout@v2 - uses: erlef/setup-beam@v1 with: - otp-version: 21.3 - elixir-version: 1.8.2 + otp-version: 22.3 + elixir-version: 1.10.4 - name: Retrieve cached PLT uses: actions/cache@v1 with: diff --git a/.github/workflows/master.yml b/.github/workflows/master.yml index 4d787ee..3d9ef6b 100644 --- a/.github/workflows/master.yml +++ b/.github/workflows/master.yml @@ -11,8 +11,8 @@ jobs: - uses: actions/checkout@v2 - uses: erlef/setup-beam@v1 with: - otp-version: 21.3 - elixir-version: 1.8.2 + otp-version: 22.3 + elixir-version: 1.10.4 - name: Get dependencies run: | mix local.rebar --force diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 7cae6e2..6432cac 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -10,8 +10,8 @@ jobs: - uses: actions/checkout@v2 - uses: erlef/setup-beam@v1 with: - otp-version: 21.3 - elixir-version: 1.8.2 + otp-version: 22.3 + elixir-version: 1.10.4 - name: Build run: | mix local.rebar --force diff --git a/.tool-versions b/.tool-versions new file mode 100644 index 0000000..49aa862 --- /dev/null +++ b/.tool-versions @@ -0,0 +1,2 @@ +erlang 22.3.4.21 +elixir 1.10.4-otp-22 diff --git a/lib/elsa/partitioner/random.ex b/lib/elsa/partitioner/random.ex index 608e2d8..c0ed603 100644 --- a/lib/elsa/partitioner/random.ex +++ b/lib/elsa/partitioner/random.ex @@ -7,6 +7,6 @@ defmodule Elsa.Partitioner.Random do @behaviour Elsa.Partitioner def partition(count, _key) do - :crypto.rand_uniform(0, count) + :rand.uniform(count) - 1 end end diff --git a/lib/elsa/topic.ex b/lib/elsa/topic.ex index f6034a7..7c9d9d3 100644 --- a/lib/elsa/topic.ex +++ b/lib/elsa/topic.ex @@ -16,9 +16,9 @@ defmodule Elsa.Topic do {:ok, metadata} = :brod.get_metadata(reformat_endpoints(endpoints), :all) topics = - metadata.topic_metadata + metadata.topics |> Enum.map(fn topic_metadata -> - {topic_metadata.topic, Enum.count(topic_metadata.partition_metadata)} + {topic_metadata.name, Enum.count(topic_metadata.partitions)} end) {:ok, topics} @@ -48,14 +48,14 @@ defmodule Elsa.Topic do config = opts |> Keyword.get(:config, []) - |> Enum.map(fn {key, val} -> %{config_name: to_string(key), config_value: val} end) + |> Enum.map(fn {key, val} -> %{name: to_string(key), value: val} end) create_topic_args = %{ - topic: topic, + name: topic, num_partitions: Keyword.get(opts, :partitions, 1), replication_factor: Keyword.get(opts, :replicas, 1), - replica_assignment: [], - config_entries: config + assignments: [], + configs: config } version = Elsa.Util.get_api_version(connection, :create_topics) @@ -88,15 +88,18 @@ defmodule Elsa.Topic do defp check_response(response) do message = kpro_rsp(response, :msg) - error_key = - case Map.has_key?(message, :topic_errors) do - true -> :topic_errors - false -> :topic_error_codes + response_key = + case Map.has_key?(message, :topics) do + true -> :topics + false -> :responses end - case Enum.find(message[error_key], fn error -> error.error_code != :no_error end) do + case Enum.find(message[response_key], fn response -> response.error_code != :no_error end) do nil -> :ok - error -> {:error, {error.error_code, error[:error_message]}} + response -> {:error, {response.error_code, resp_error_msg(response, response_key)}} end end + + defp resp_error_msg(response, :topics), do: response.error_message + defp resp_error_msg(_response, :responses), do: :delete_topic_error end diff --git a/lib/elsa/util.ex b/lib/elsa/util.ex index a739797..4c96e54 100644 --- a/lib/elsa/util.ex +++ b/lib/elsa/util.ex @@ -109,21 +109,25 @@ defmodule Elsa.Util do def partition_count(endpoints, topic) when is_list(endpoints) do {:ok, metadata} = :brod.get_metadata(reformat_endpoints(endpoints), [topic]) - metadata.topic_metadata - |> Enum.map(fn topic_metadata -> - Enum.count(topic_metadata.partition_metadata) - end) - |> hd() + count_partitions(metadata) end def partition_count(connection, topic) when is_atom(connection) or is_pid(connection) do {:ok, metadata} = :brod_client.get_metadata(connection, topic) - metadata.topic_metadata - |> Enum.map(fn topic_metadata -> - Enum.count(topic_metadata.partition_metadata) - end) - |> hd() + count_partitions(metadata) + end + + # Handle brod < 3.16 + defp count_partitions(%{topic_metadata: topic_metadatas}) do + [count | _] = for %{partition_metadata: metadata} <- topic_metadatas, do: Enum.count(metadata) + count + end + + # Handle brod 3.16+ + defp count_partitions(%{topics: topics}) do + [count | _] = for %{partitions: partitions} <- topics, do: Enum.count(partitions) + count end defp connect(endpoints, :controller), do: :kpro.connect_controller(endpoints, []) diff --git a/mix.exs b/mix.exs index 156e4f7..bf1fafa 100644 --- a/mix.exs +++ b/mix.exs @@ -1,7 +1,7 @@ defmodule Elsa.MixProject do use Mix.Project - @version "1.0.0-rc.2" + @version "1.0.0-rc.3" @github "https://github.com/bbalser/elsa" def project do @@ -9,7 +9,7 @@ defmodule Elsa.MixProject do app: :elsa, name: "Elsa", version: @version, - elixir: "~> 1.8", + elixir: "~> 1.10", start_permanent: Mix.env() == :prod, description: description(), package: package(), @@ -30,7 +30,7 @@ defmodule Elsa.MixProject do defp deps do [ - {:brod, "~> 3.14.0"}, + {:brod, "~> 3.16"}, {:patiently, "~> 0.2", only: [:dev, :test, :integration]}, {:divo, "~> 1.3", only: [:dev, :test, :integration], override: true}, {:divo_kafka, "~> 0.1.7", only: [:dev, :test, :integration]}, diff --git a/mix.lock b/mix.lock index 6ba0615..60879ee 100644 --- a/mix.lock +++ b/mix.lock @@ -1,23 +1,22 @@ %{ - "brod": {:hex, :brod, "3.14.0", "f959408e88acd0feca22f6a43ca26e70201c6e5c57dc74b87f08ef65a5e7fe18", [:rebar3], [{:kafka_protocol, "2.3.6", [hex: :kafka_protocol, repo: "hexpm", optional: false]}, {:supervisor3, "1.1.11", [hex: :supervisor3, repo: "hexpm", optional: false]}], "hexpm", "a0153437835b810d93e79c7000f55b0c4bd62c281a3224a2615f6aa72b52d484"}, + "brod": {:hex, :brod, "3.16.1", "1c7b03f99c7cc310de5511cadad9879ab0cc5f1a2612211e68c26dad517d31b0", [:rebar3], [{:kafka_protocol, "4.0.1", [hex: :kafka_protocol, repo: "hexpm", optional: false]}, {:snappyer, "1.2.8", [hex: :snappyer, repo: "hexpm", optional: false]}, {:supervisor3, "1.1.11", [hex: :supervisor3, repo: "hexpm", optional: false]}], "hexpm", "8297c47cd1ff0657955027fa1beb62edfaab1cc5e09b714cc29bd7f1c8d40083"}, "checkov": {:hex, :checkov, "1.0.0", "cecf1be22ea506b2fbd6741d7c00f4876bb2be76ea1b95493c25b51028f24410", [:mix], [], "hexpm", "9fa85e6fdf1bcec2dd0d996d0c1e5a83e336dafb97c931232af1cb1e7ef4420a"}, - "crc32cer": {:hex, :crc32cer, "0.1.4", "a656dff19474d1a1fc5bb0081610ab6b0695b23affc47fa90abeb079a8ef9752", [:rebar3], [], "hexpm", "964735a5422cf65bbc5354860a560fff546f0026f83f8860525bd58ab5bade5d"}, + "crc32cer": {:hex, :crc32cer, "0.1.8", "c6c2275c5fb60a95f4935d414f30b50ee9cfed494081c9b36ebb02edfc2f48db", [:rebar3], [], "hexpm", "251499085482920deb6c9b7aadabf9fb4c432f96add97ab42aee4501e5b6f591"}, "dialyxir": {:hex, :dialyxir, "1.1.0", "c5aab0d6e71e5522e77beff7ba9e08f8e02bad90dfbeffae60eaf0cb47e29488", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "07ea8e49c45f15264ebe6d5b93799d4dd56a44036cf42d0ad9c960bc266c0b9a"}, "divo": {:hex, :divo, "1.3.1", "a7cdb05d4525a9703e11dbcf40567d426b546f8e816b9c9465232c10bc6a257b", [:mix], [{:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:patiently, "~> 0.2", [hex: :patiently, repo: "hexpm", optional: false]}], "hexpm", "b3edef7baf068bbf864c01c8f3ed06fcf81c08e8d4adcc1cfc4b7d6eb69c6a18"}, "divo_kafka": {:hex, :divo_kafka, "0.1.7", "e8253bb735e001c41f35645ac0429740b6b6350ceb0ae268609f769f0b3883c5", [:mix], [{:divo, "~> 1.1", [hex: :divo, repo: "hexpm", optional: false]}], "hexpm", "25f9b89a1f59f6801b8b1e044eaa8cdce4e0756b4a8512458ea31f9c99ec338f"}, - "earmark": {:hex, :earmark, "1.4.5", "62ffd3bd7722fb7a7b1ecd2419ea0b458c356e7168c1f5d65caf09b4fbdd13c8", [:mix], [], "hexpm", "b7d0e6263d83dc27141a523467799a685965bf8b13b6743413f19a7079843f4f"}, - "earmark_parser": {:hex, :earmark_parser, "1.4.15", "b29e8e729f4aa4a00436580dcc2c9c5c51890613457c193cc8525c388ccb2f06", [:mix], [], "hexpm", "044523d6438ea19c1b8ec877ec221b008661d3c27e3b848f4c879f500421ca5c"}, + "earmark_parser": {:hex, :earmark_parser, "1.4.17", "6f3c7e94170377ba45241d394389e800fb15adc5de51d0a3cd52ae766aafd63f", [:mix], [], "hexpm", "f93ac89c9feca61c165b264b5837bf82344d13bebc634cd575cb711e2e342023"}, "erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"}, - "ex_doc": {:hex, :ex_doc, "0.25.3", "3edf6a0d70a39d2eafde030b8895501b1c93692effcbd21347296c18e47618ce", [:mix], [{:earmark_parser, "~> 1.4.0", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "9ebebc2169ec732a38e9e779fd0418c9189b3ca93f4a676c961be6c1527913f5"}, + "ex_doc": {:hex, :ex_doc, "0.25.5", "ac3c5425a80b4b7c4dfecdf51fa9c23a44877124dd8ca34ee45ff608b1c6deb9", [:mix], [{:earmark_parser, "~> 1.4.0", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "688cfa538cdc146bc4291607764a7f1fcfa4cce8009ecd62de03b27197528350"}, "jason": {:hex, :jason, "1.2.2", "ba43e3f2709fd1aa1dce90aaabfd039d000469c05c56f0b8e31978e03fa39052", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "18a228f5f0058ee183f29f9eae0805c6e59d61c3b006760668d8d18ff0d12179"}, - "kafka_protocol": {:hex, :kafka_protocol, "2.3.6", "df076a8ef49fffae3535c805cb00f3a057ce1895e63398bf8a10569eeeac02f8", [:rebar, :rebar3], [{:crc32cer, "0.1.4", [hex: :crc32cer, repo: "hexpm", optional: false]}, {:snappyer, "1.2.5", [hex: :snappyer, repo: "hexpm", optional: false]}], "hexpm", "7cb061fe46babc7fd269d2c0e5b4dba5d1efc4f7dacce85b17a9cca973106b23"}, + "kafka_protocol": {:hex, :kafka_protocol, "4.0.1", "fc696880c73483c8b032c4bb60f2873046035c7824e1edcb924cfce643cf23dd", [:rebar3], [{:crc32cer, "0.1.8", [hex: :crc32cer, repo: "hexpm", optional: false]}], "hexpm", "687bfd9989998ec8fbbc3ed50d1239a6c07a7dc15b52914ad477413b89ecb621"}, "makeup": {:hex, :makeup, "1.0.5", "d5a830bc42c9800ce07dd97fa94669dfb93d3bf5fcf6ea7a0c67b2e0e4a7f26c", [:mix], [{:nimble_parsec, "~> 0.5 or ~> 1.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cfa158c02d3f5c0c665d0af11512fed3fba0144cf1aadee0f2ce17747fba2ca9"}, - "makeup_elixir": {:hex, :makeup_elixir, "0.15.1", "b5888c880d17d1cc3e598f05cdb5b5a91b7b17ac4eaf5f297cb697663a1094dd", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.1", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "db68c173234b07ab2a07f645a5acdc117b9f99d69ebf521821d89690ae6c6ec8"}, + "makeup_elixir": {:hex, :makeup_elixir, "0.15.2", "dc72dfe17eb240552857465cc00cce390960d9a0c055c4ccd38b70629227e97c", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.1", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "fd23ae48d09b32eff49d4ced2b43c9f086d402ee4fd4fcb2d7fad97fa8823e75"}, "makeup_erlang": {:hex, :makeup_erlang, "0.1.1", "3fcb7f09eb9d98dc4d208f49cc955a34218fc41ff6b84df7c75b3e6e533cc65f", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "174d0809e98a4ef0b3309256cbf97101c6ec01c4ab0b23e926a9e17df2077cbb"}, "meck": {:hex, :meck, "0.9.2", "85ccbab053f1db86c7ca240e9fc718170ee5bda03810a6292b5306bf31bae5f5", [:rebar3], [], "hexpm", "81344f561357dc40a8344afa53767c32669153355b626ea9fcbc8da6b3045826"}, - "nimble_parsec": {:hex, :nimble_parsec, "1.1.0", "3a6fca1550363552e54c216debb6a9e95bd8d32348938e13de5eda962c0d7f89", [:mix], [], "hexpm", "08eb32d66b706e913ff748f11694b17981c0b04a33ef470e33e11b3d3ac8f54b"}, + "nimble_parsec": {:hex, :nimble_parsec, "1.2.0", "b44d75e2a6542dcb6acf5d71c32c74ca88960421b6874777f79153bbbbd7dccc", [:mix], [], "hexpm", "52b2871a7515a5ac49b00f214e4165a40724cf99798d8e4a65e4fd64ebd002c1"}, "patiently": {:hex, :patiently, "0.2.0", "67eb139591e10c4b363ae0198e832552f191c58894731efd3bf124ec4722267a", [:mix], [], "hexpm", "c08cc5edc27def565647a9b55a0bea8025a5f81a4472e57692f28f2292c44c94"}, "placebo": {:hex, :placebo, "2.0.0", "c0e773dec77e941bcbcc14d10b759f2d66775aff9b75051f3e41939b64300e81", [:mix], [{:meck, "~> 0.9", [hex: :meck, repo: "hexpm", optional: false]}], "hexpm", "e0872cec8848d7e59ba96396f45ee1ad34662c689c86ba6190694d38b4289844"}, - "snappyer": {:hex, :snappyer, "1.2.5", "9154b9ac84031f0a799f72a4aa87df23ab2193b5631475fa2cdc304382d2df77", [:rebar3], [], "hexpm", "d2adc26a81efd5f138397a38a0bb545188d302972721f8be0de37fa452c8aed7"}, + "snappyer": {:hex, :snappyer, "1.2.8", "201ce9067a33c71a6a5087c0c3a49a010b17112d461e6df696c722dcb6d0934a", [:rebar3], [], "hexpm", "35518e79a28548b56d8fd6aee2f565f12f51c2d3d053f9cfa817c83be88c4f3d"}, "supervisor3": {:hex, :supervisor3, "1.1.11", "d81cdec31d102fde407423e1d05b569572850deebed86b951d5233c387cba80b", [:rebar3], [], "hexpm", "e6c2dedbcabcba24995a218aca12db5e208b80d3252692b22ef0f1a266104b50"}, } diff --git a/test/integration/elsa/consumer_test.exs b/test/integration/elsa/consumer_test.exs index fd37106..e77a9aa 100644 --- a/test/integration/elsa/consumer_test.exs +++ b/test/integration/elsa/consumer_test.exs @@ -25,8 +25,8 @@ defmodule Elsa.ConsumerTest do send_messages(topic, ["message1", "message2"]) - assert_receive {:message, %{topic: topic, partition: 0, offset: _, key: "", value: "message1"}}, 5_000 - assert_receive {:message, %{topic: topic, partition: 1, offset: _, key: "", value: "message2"}}, 5_000 + assert_receive {:message, %{topic: ^topic, partition: 0, offset: _, key: "", value: "message1"}}, 5_000 + assert_receive {:message, %{topic: ^topic, partition: 1, offset: _, key: "", value: "message2"}}, 5_000 end test "Elsa.Consumer will hand messages to the handler without state" do diff --git a/test/unit/elsa/topic_test.exs b/test/unit/elsa/topic_test.exs index b4f9027..dbc4bc5 100644 --- a/test/unit/elsa/topic_test.exs +++ b/test/unit/elsa/topic_test.exs @@ -26,11 +26,11 @@ defmodule Elsa.TopicTest do allow :kpro_req_lib.create_topics(any(), any(), any()), return: :topic_request message = %{ - topic_errors: [ + topics: [ %{ error_code: :topic_already_exists, error_message: "Topic 'elsa-topic' already exists.", - topic: "elsa-topic" + name: "elsa-topic" } ] } @@ -54,10 +54,10 @@ defmodule Elsa.TopicTest do allow :kpro_req_lib.delete_topics(any(), any(), any()), return: :topic_request message = %{ - topic_error_codes: [ + responses: [ %{ error_code: :topic_doesnt_exist, - topic: "elsa-topic" + name: "elsa-topic" } ] } @@ -70,21 +70,21 @@ defmodule Elsa.TopicTest do internal_result = function.(:connection) - assert {:error, {:topic_doesnt_exist, nil}} == internal_result + assert {:error, {:topic_doesnt_exist, :delete_topic_error}} == internal_result end end describe "list_topics/1" do test "extracts topics and partitions as a list of tuples" do metadata = %{ - topic_metadata: [ + topics: [ %{ - partition_metadata: [%{partition: 0}], - topic: "elsa-other-topic" + partitions: [%{partition: 0}], + name: "elsa-other-topic" }, %{ - partition_metadata: [%{partition: 0}, %{partition: 1}], - topic: "elsa-topic" + partitions: [%{partition: 0}, %{partition: 1}], + name: "elsa-topic" } ] } @@ -102,14 +102,14 @@ defmodule Elsa.TopicTest do describe "exists?/2" do test "returns a boolean identifying the presence of a given topic" do metadata = %{ - topic_metadata: [ + topics: [ %{ - partition_metadata: [%{partition: 0}], - topic: "elsa-other-topic" + partitions: [%{partition: 0}], + name: "elsa-other-topic" }, %{ - partition_metadata: [%{partition: 0}, %{partition: 1}], - topic: "elsa-topic" + partitions: [%{partition: 0}, %{partition: 1}], + name: "elsa-topic" } ] }