diff --git a/README.md b/README.md index 17f0939..bda9732 100644 --- a/README.md +++ b/README.md @@ -27,3 +27,17 @@ end ## Getting started Check the [Getting started](https://hexdocs.pm/kafkaesque/getting-started.html) guide in Hexdocs. + +## Updating versions +To go from 1.0.0-rc.1 to 1.0.0-rc.2, an additional migration is needed: + +```elixir +defmodule MyApp.Migrations.BumpKafkaesque do + use Ecto.Migration + + def up, do: Kafkaesque.Migrations.up(:v1, :v2) + def down, do: Kafkaesque.Migrations.down(:v2, :v1) +end +``` + +No extra steps are required if 1.0.0-rc.2 or a newer version was installed. diff --git a/lib/kafkaesque.ex b/lib/kafkaesque.ex index fd8c1e2..abd1d06 100644 --- a/lib/kafkaesque.ex +++ b/lib/kafkaesque.ex @@ -50,10 +50,10 @@ defmodule Kafkaesque do use the library. See the documentation of the `Kafkaesque` module for more information. """ - @spec publish(Ecto.Repo.t(), String.t(), term(), String.t()) :: - {:ok, Message.t()} | {:error, atom()} - def publish(repo, topic, partition, payload) do - message = Message.new(topic, partition, payload) + @spec publish(Ecto.Repo.t(), String.t(), term(), String.t(), String.t()) :: + {:ok, Message.t()} | {:error, Ecto.Changeset.t()} + def publish(repo, topic, partition, key, payload) do + message = Message.new(topic, partition, key, payload) repo.insert(message) end @@ -108,11 +108,14 @@ defmodule Kafkaesque do {repo, _opts} = Keyword.pop!(opts, :repo) quote do - @spec publish(String.t(), term()) :: {:ok, Kafkaesque.Message.t()} | {:error, atom()} - def publish(topic, body) do + @on_definition {Kafkaesque.CompileHooks, :on_def} + + @spec publish(String.t(), String.t(), term()) :: + {:ok, Kafkaesque.Message.t()} | {:error, Ecto.Changeset.t()} + def publish(topic, key \\ "", body) do payload = encode(body) - partition = partition(topic, body) - Kafkaesque.publish(unquote(repo), topic, partition, payload) + partition = partition(topic, key, body) + Kafkaesque.publish(unquote(repo), topic, partition, key, payload) end @spec encode(term()) :: String.t() @@ -120,8 +123,8 @@ defmodule Kafkaesque do body end - @spec partition(String.t(), term()) :: integer() - def partition(_topic, _body) do + @spec partition(String.t(), String.t(), term()) :: integer() + def partition(_topic, _key, _body) do 0 end @@ -137,7 +140,7 @@ defmodule Kafkaesque do Kafkaesque.child_spec(opts) end - defoverridable encode: 1, partition: 2 + defoverridable encode: 1, partition: 3 end end end diff --git a/lib/kafkaesque/compile_hooks.ex b/lib/kafkaesque/compile_hooks.ex new file mode 100644 index 0000000..dde05a0 --- /dev/null +++ b/lib/kafkaesque/compile_hooks.ex @@ -0,0 +1,20 @@ +defmodule Kafkaesque.CompileHooks do + @moduledoc false + + # We need to raise a compile error if partition/2 is defined due to a breaking + # change: outbox modules are now expected to implement partition/3, not partition/2 + # + # This way we turn the breaking change into a compile error instead of runtime + # misbehaviour + def on_def(_env, :def, :partition, args, _guards, _body) do + if Enum.count(args) == 3 do + :ok + else + raise "partition function is expected to have 3 arguments" + end + end + + def on_def(_env, _kind, _name, _args, _guards, _body) do + :ok + end +end diff --git a/lib/kafkaesque/message.ex b/lib/kafkaesque/message.ex index fe0af1b..1f7df60 100644 --- a/lib/kafkaesque/message.ex +++ b/lib/kafkaesque/message.ex @@ -34,6 +34,7 @@ defmodule Kafkaesque.Message do ) field(:partition, :integer) + field(:key, :string, default: "") field(:body, :string) field(:attempt, :integer, default: 0) field(:attempted_by, :string) @@ -44,12 +45,13 @@ defmodule Kafkaesque.Message do timestamps() end - @spec new(String.t(), String.t(), String.t()) :: Ecto.Changeset.t() - def new(topic, partition, body) do + @spec new(String.t(), String.t(), String.t(), String.t()) :: Ecto.Changeset.t() + def new(topic, partition, key, body) do %__MODULE__{} - |> cast(%{topic: topic, partition: partition, body: body}, [ + |> cast(%{topic: topic, partition: partition, body: body, key: key}, [ :topic, :partition, + :key, :body ]) |> validate_required([:topic, :partition, :body]) diff --git a/lib/kafkaesque/migrations.ex b/lib/kafkaesque/migrations.ex index a377f6d..abede5f 100644 --- a/lib/kafkaesque/migrations.ex +++ b/lib/kafkaesque/migrations.ex @@ -1,12 +1,14 @@ defmodule Kafkaesque.Migrations do use Ecto.Migration - def up do + # Functions independent from version + def up() do create table(:kafkaesque_messages, primary_key: false) do add(:id, :bigserial, primary_key: true) add(:state, :string, null: false, default: "pending") add(:topic, :string, null: false) add(:partition, :integer, null: false) + add(:key, :binary, default: "") add(:body, :binary) add(:attempt, :integer, null: false, default: 0) add(:attempted_by, :string) @@ -17,7 +19,23 @@ defmodule Kafkaesque.Migrations do end end - def down do + def down() do drop(table(:kafkaesque_messages)) end + + def up(current, next) + + def up(:v1, :v2) do + alter table(:kafkaesque_messages) do + add(:key, :binary, default: "", null: false) + end + end + + def down(current, previous) + + def down(:v2, :v1) do + alter table(:kafkaesque_messages) do + drop(:key) + end + end end diff --git a/test/kafkaesque/message_test.exs b/test/kafkaesque/message_test.exs index 031b1da..7b71fae 100644 --- a/test/kafkaesque/message_test.exs +++ b/test/kafkaesque/message_test.exs @@ -8,20 +8,22 @@ defmodule Kafkaesque.MessageTest do topic = "sample" body = "body" partition = 0 + key = "some" assert %Ecto.Changeset{ errors: [], - changes: %{topic: ^topic, body: ^body, partition: ^partition} - } = Message.new(topic, partition, body) + changes: %{topic: ^topic, body: ^body, partition: ^partition, key: ^key} + } = Message.new(topic, partition, key, body) end test "returns invalid changeset for invalid input" do topic = 1 body = {1, 2} + key = 2 partition = "notanumber" - assert %Ecto.Changeset{errors: [topic: _, partition: _, body: _]} = - Message.new(topic, partition, body) + assert %Ecto.Changeset{errors: [topic: _, partition: _, key: _, body: _]} = + Message.new(topic, partition, key, body) end end end diff --git a/test/kafkaesque_test.exs b/test/kafkaesque_test.exs index e70f589..7c30d78 100644 --- a/test/kafkaesque_test.exs +++ b/test/kafkaesque_test.exs @@ -5,16 +5,23 @@ defmodule KafkaesqueTest do describe "publish/4" do test "inserts valid messages" do - {:ok, %Kafkaesque.Message{}} = Kafkaesque.publish(Repo, "test_topic", 0, "content") + {:ok, %Kafkaesque.Message{}} = Kafkaesque.publish(Repo, "test_topic", 0, "", "content") end test "errors for invalid messages" do invalid_topic = 1 invalid_body = {1, 2} + invalid_key = 2 invalid_partition = "notanumber" - assert {:error, %Ecto.Changeset{}} = - Kafkaesque.publish(Repo, invalid_topic, invalid_partition, invalid_body) + assert {:error, %Ecto.Changeset{errors: [topic: _, partition: _, key: _, body: _]}} = + Kafkaesque.publish( + Repo, + invalid_topic, + invalid_partition, + invalid_key, + invalid_body + ) end end end