Skip to content

Commit

Permalink
Add key support
Browse files Browse the repository at this point in the history
  • Loading branch information
v0idpwn committed Nov 2, 2023
1 parent c9a356c commit 1745053
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 23 deletions.
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,17 @@ end
## Getting started

Check the [Getting started](https://hexdocs.pm/kafkaesque/getting-started.html) guide in Hexdocs.

## Updating versions
To go from 1.0.0-rc.1 to 1.0.0-rc.2, an additional migration is needed:

```elixir
defmodule MyApp.Migrations.BumpKafkaesque do
use Ecto.Migration

def up, do: Kafkaesque.Migrations.up(:v1, :v2)
def down, do: Kafkaesque.Migrations.down(:v2, :v1)
end
```

No extra steps are required if 1.0.0-rc.2 or a newer version was installed.
25 changes: 14 additions & 11 deletions lib/kafkaesque.ex
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ defmodule Kafkaesque do
use the library. See the documentation of the `Kafkaesque` module for more
information.
"""
@spec publish(Ecto.Repo.t(), String.t(), term(), String.t()) ::
{:ok, Message.t()} | {:error, atom()}
def publish(repo, topic, partition, payload) do
message = Message.new(topic, partition, payload)
@spec publish(Ecto.Repo.t(), String.t(), term(), String.t(), String.t()) ::
{:ok, Message.t()} | {:error, Ecto.Changeset.t()}
def publish(repo, topic, partition, key, payload) do
message = Message.new(topic, partition, key, payload)
repo.insert(message)
end

Expand Down Expand Up @@ -108,20 +108,23 @@ defmodule Kafkaesque do
{repo, _opts} = Keyword.pop!(opts, :repo)

quote do
@spec publish(String.t(), term()) :: {:ok, Kafkaesque.Message.t()} | {:error, atom()}
def publish(topic, body) do
@on_definition {Kafkaesque.CompileHooks, :on_def}

@spec publish(String.t(), String.t(), term()) ::
{:ok, Kafkaesque.Message.t()} | {:error, Ecto.Changeset.t()}
def publish(topic, key \\ "", body) do
payload = encode(body)
partition = partition(topic, body)
Kafkaesque.publish(unquote(repo), topic, partition, payload)
partition = partition(topic, key, body)
Kafkaesque.publish(unquote(repo), topic, partition, key, payload)
end

@spec encode(term()) :: String.t()
def encode(body) do
body
end

@spec partition(String.t(), term()) :: integer()
def partition(_topic, _body) do
@spec partition(String.t(), String.t(), term()) :: integer()
def partition(_topic, _key, _body) do
0
end

Expand All @@ -137,7 +140,7 @@ defmodule Kafkaesque do
Kafkaesque.child_spec(opts)
end

defoverridable encode: 1, partition: 2
defoverridable encode: 1, partition: 3
end
end
end
20 changes: 20 additions & 0 deletions lib/kafkaesque/compile_hooks.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
defmodule Kafkaesque.CompileHooks do
@moduledoc false

# We need to raise a compile error if partition/2 is defined due to a breaking
# change: outbox modules are now expected to implement partition/3, not partition/2
#
# This way we turn the breaking change into a compile error instead of runtime
# misbehaviour
def on_def(_env, :def, :partition, args, _guards, _body) do
if Enum.count(args) == 3 do
:ok
else
raise "partition function is expected to have 3 arguments"
end
end

def on_def(_env, _kind, _name, _args, _guards, _body) do
:ok
end
end
8 changes: 5 additions & 3 deletions lib/kafkaesque/message.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ defmodule Kafkaesque.Message do
)

field(:partition, :integer)
field(:key, :string, default: "")
field(:body, :string)
field(:attempt, :integer, default: 0)
field(:attempted_by, :string)
Expand All @@ -44,12 +45,13 @@ defmodule Kafkaesque.Message do
timestamps()
end

@spec new(String.t(), String.t(), String.t()) :: Ecto.Changeset.t()
def new(topic, partition, body) do
@spec new(String.t(), String.t(), String.t(), String.t()) :: Ecto.Changeset.t()
def new(topic, partition, key, body) do
%__MODULE__{}
|> cast(%{topic: topic, partition: partition, body: body}, [
|> cast(%{topic: topic, partition: partition, body: body, key: key}, [
:topic,
:partition,
:key,
:body
])
|> validate_required([:topic, :partition, :body])
Expand Down
22 changes: 20 additions & 2 deletions lib/kafkaesque/migrations.ex
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
defmodule Kafkaesque.Migrations do
use Ecto.Migration

def up do
# Functions independent from version
def up() do
create table(:kafkaesque_messages, primary_key: false) do
add(:id, :bigserial, primary_key: true)
add(:state, :string, null: false, default: "pending")
add(:topic, :string, null: false)
add(:partition, :integer, null: false)
add(:key, :binary, default: "")
add(:body, :binary)
add(:attempt, :integer, null: false, default: 0)
add(:attempted_by, :string)
Expand All @@ -17,7 +19,23 @@ defmodule Kafkaesque.Migrations do
end
end

def down do
def down() do
drop(table(:kafkaesque_messages))
end

def up(current, next)

def up(:v1, :v2) do
alter table(:kafkaesque_messages) do
add(:key, :binary, default: "", null: false)
end
end

def down(current, previous)

def down(:v2, :v1) do
alter table(:kafkaesque_messages) do
drop(:key)
end
end
end
10 changes: 6 additions & 4 deletions test/kafkaesque/message_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,22 @@ defmodule Kafkaesque.MessageTest do
topic = "sample"
body = "body"
partition = 0
key = "some"

assert %Ecto.Changeset{
errors: [],
changes: %{topic: ^topic, body: ^body, partition: ^partition}
} = Message.new(topic, partition, body)
changes: %{topic: ^topic, body: ^body, partition: ^partition, key: ^key}
} = Message.new(topic, partition, key, body)
end

test "returns invalid changeset for invalid input" do
topic = 1
body = {1, 2}
key = 2
partition = "notanumber"

assert %Ecto.Changeset{errors: [topic: _, partition: _, body: _]} =
Message.new(topic, partition, body)
assert %Ecto.Changeset{errors: [topic: _, partition: _, key: _, body: _]} =
Message.new(topic, partition, key, body)
end
end
end
13 changes: 10 additions & 3 deletions test/kafkaesque_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,23 @@ defmodule KafkaesqueTest do

describe "publish/4" do
test "inserts valid messages" do
{:ok, %Kafkaesque.Message{}} = Kafkaesque.publish(Repo, "test_topic", 0, "content")
{:ok, %Kafkaesque.Message{}} = Kafkaesque.publish(Repo, "test_topic", 0, "", "content")
end

test "errors for invalid messages" do
invalid_topic = 1
invalid_body = {1, 2}
invalid_key = 2
invalid_partition = "notanumber"

assert {:error, %Ecto.Changeset{}} =
Kafkaesque.publish(Repo, invalid_topic, invalid_partition, invalid_body)
assert {:error, %Ecto.Changeset{errors: [topic: _, partition: _, key: _, body: _]}} =
Kafkaesque.publish(
Repo,
invalid_topic,
invalid_partition,
invalid_key,
invalid_body
)
end
end
end

0 comments on commit 1745053

Please sign in to comment.