Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add one piece commanded router #132

Merged
merged 1 commit into from
May 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion apps/one_piece_commanded/.formatter.exs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
locals_without_parens = [
register_type: 2,
import_type_provider: 1
import_type_provider: 1,
dispatch_transaction_script: 2
]

[
Expand Down
20 changes: 11 additions & 9 deletions apps/one_piece_commanded/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,33 @@

## Unreleased

- Added `OnePiece.Commanded.CommandRouter` module.

## v0.19.1 - 2023-10-04

- Fixed `new/1` typespec of `OnePiece.Commanded.ValueObject`.

## v0.19.0 - 2023-06-15

`OnePiece.Commanded.ValueObject` implements `Ecto.Type`, it means that `cast/2`, `dump/2`, and `load/2` are
added to every module that uses `OnePiece.Commanded.ValueObject`. This will allow you to use
added to every module that uses `OnePiece.Commanded.ValueObject`. This will allow you to use
`OnePiece.Commanded.ValueObject` as a field in your `Ecto.Schema` and as custom type in your
`OnePiece.Commanded.Aggregate`, `OnePiece.Commanded.Command`, and `OnePiece.Commanded.Event`.

Added support for custom types for `OnePiece.Commanded.Aggregate`, `OnePiece.Commanded.Command`, and
`OnePiece.Commanded.Event`. This will allow you to have Custom aggregate identity. Read more about at
Added support for custom types for `OnePiece.Commanded.Aggregate`, `OnePiece.Commanded.Command`, and
`OnePiece.Commanded.Event`. This will allow you to have Custom aggregate identity. Read more about at
https://hexdocs.pm/commanded/commands.html#define-aggregate-identity under "Custom aggregate identity".

```elixir
defmodule AccountNumber do
use OnePiece.Commanded.ValueObject

embedded_schema do
field :account_number, :string
field :branch, :string
end
# You must implement `String.Chars` protocol in order to work when dispatching the Command.

# You must implement `String.Chars` protocol in order to work when dispatching the Command.
defimpl String.Chars do
def to_string(%AccountNumber{branch: branch, account_number: account_number}) do
branch <> ":" <> account_number
Expand All @@ -45,7 +47,7 @@ end

defmodule DepositAccountOpened do
use OnePiece.Commanded.Event,
aggregate_identifier: {:account_number, AccountNumber}
aggregate_identifier: {:account_number, AccountNumber}

embedded_schema do
# ...
Expand All @@ -54,7 +56,7 @@ end

defmodule OpenDepositAccount do
use OnePiece.Commanded.Command,
aggregate_identifier: {:account_number, AccountNumber}
aggregate_identifier: {:account_number, AccountNumber}

embedded_schema do
# ...
Expand All @@ -73,7 +75,7 @@ end
## v0.16.0 - 2023-04-11

- Added `OnePiece.Commanded.EventStore.JsonbSerializer` module.

## v0.15.2 - 2023-03-14

- Fix casting already cast structs in `OnePiece.Commanded.ValueObject`.
Expand Down
17 changes: 16 additions & 1 deletion apps/one_piece_commanded/lib/one_piece/commanded/command.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ defmodule OnePiece.Commanded.Command do
@doc """
Converts the module into an `t:t/0`.

### Options

- `:aggregate_identifier` - The aggregate identifier key.
- `:stream_prefix` (optional) - The prefix to be used for the identity.

## Using

- `OnePiece.Commanded.ValueObject`
Expand Down Expand Up @@ -53,12 +58,14 @@ defmodule OnePiece.Commanded.Command do
end
end
"""
@spec __using__(opts :: [aggregate_identifier: aggregate_identifier_opt()]) :: any()
@spec __using__(opts :: [aggregate_identifier: aggregate_identifier_opt(), stream_prefix: String.t() | nil]) :: any()
defmacro __using__(opts \\ []) do
unless Keyword.has_key?(opts, :aggregate_identifier) do
raise ArgumentError, "missing :aggregate_identifier key"
end

stream_prefix = Keyword.get(opts, :stream_prefix)

{aggregate_identifier, aggregate_identifier_type} =
opts
|> Keyword.fetch!(:aggregate_identifier)
Expand All @@ -81,6 +88,14 @@ defmodule OnePiece.Commanded.Command do
def aggregate_identifier do
unquote(aggregate_identifier)
end

@doc """
Returns `#{inspect(unquote(stream_prefix))}` as the identity prefix.
"""
@spec stream_prefix :: String.t() | nil
def stream_prefix do
unquote(stream_prefix)
end
end
end
end
159 changes: 159 additions & 0 deletions apps/one_piece_commanded/lib/one_piece/commanded/command_router.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
defmodule OnePiece.Commanded.CommandRouter do
@moduledoc """
Command routing macro to allow configuration of each command to its command handler.

Please read more about `Commanded.Commands.Router`.
"""

@doc """
It calls `Commanded.Commands.Router.__using__/1` and imports extra macros.
"""
defmacro __using__(opts) do
quote do
use Commanded.Commands.Router, unquote(opts)
import OnePiece.Commanded.CommandRouter
end
end

@doc """
Configure the command to be dispatched to the same command module.

## Example

defmodule Router do
use OnePiece.Commanded.CommandRouter
register_transaction_script OpenBankAccount,
aggregate: BankAccount
end

defmodule Router do
use OnePiece.Commanded.CommandRouter
# the aggregate module is `OpenBankAccount.Aggregate`
register_transaction_script OpenBankAccount
end


Similar to `Commanded.Commands.Router.dispatch/2` except that,

1. It uses the command module as the command handler as well.
2. It uses the command module's aggregate identifier configuration.
3. When no lifespan module is provided, it uses `OnePiece.Commanded.Aggregate.StatelessLifespan`. as the lifespan
module to avoid OOM errors. Reliability is more important than performance.
4. When no aggregate module is provided, it uses the command module name suffixed concat with `Aggregate` as the
aggregate module.
As an example, if the command module is `OpenBankAccount`, then the aggregate module is
`OpenBankAccount.Aggregate`.
5. Drops some options to avoid conflicts with the command module configuration.

### About the Transaction Script naming

The naming comes from [Transaction Script from Martin Fowler](https://martinfowler.com/eaaCatalog/transactionScript.html),
although it is not exactly the same since the original name is about database transactions, the key part is the
following sentence:

> A Transaction Script organizes all this logic primarily as a single procedure.

This is the idea behind the `register_transaction_script/2` macro is to have the strongest cohesion possible for a
given Use Case (Command in this case).
Discouraging patterns around a single Command Handler that handles multiple commands, that over time, can become a
a [God Module](https://en.wikipedia.org/wiki/God_object),

God Modules that becomes difficult to maintain and understand. Having to come up with weird names for functions, scroll
pass hundreds of lines of code to find the right piece of code to change, or bike-shedding around locations of the
code, to the point that some developers will annotate with comments to indicate Labels for IDEs to jump to the right
place, ect.

### Why to have an Aggregate per Command?

The needs for an given Aggregate is driven by the command handler that requires such "state", it is not the other way
around. The same problems with the God Module can happen with the God Aggregate.

In practices, most of the `apply/2` functions will be a simple functions, barely anything passed basic permutations.
Since the Aggregate state needs is driven by the command handler, it makes sense to have a 1:1 relationship between
the command and the aggregate.

The only dependency between the command handlers are the events, not the "state" of the aggregate. The existence of
a given event may be due to another command handler, that is OK, a set of Command Handlers acts upon the same
facts/events, the past is immutable, the past is the same for everyone, not the state of the aggregate.
That is one reason why when you are doing testing you speak in terms of "given previous events" and not
"given previous state".

Being said, be pragmatic.

### FAQ

Q: Should I have a `command.ex` and `command/aggregate.ex` files?
A: Yes, but it is discouraged, use the same `command.ex` file to put the
command and the aggregate. As we said before, high cohesion of the code is
the goal, not the number of files, the more files you have, the more entropy
you have to deal with, and since the aggregate is driven by the command
handler, you are most likely to modify the command and the aggregate at the
same time, or in the best case, ignore the aggregate module.

Here is an example file structure:

defmodule BankAccount.OpenBankAccount.Aggregate do
use OnePiece.Commanded.Aggregate, identifier: :uuid
embedded_schema do
# ...
end
end

defmodule BankAccount.OpenBankAccount do
use OnePiece.Commanded.CommandHandler
use OnePiece.Commanded.Command,
aggregate_identifier: :uuid,
stream_prefix: "bank-account-"

alias BankAccount.{
BankAccountOpened,
OpenBankAccount
}

embedded_schema do
end

def handle(%OpenBankAccount.Aggregate{} = aggregate, %OpenBankAccount{} = command) do
# ...
end
end



Q: Am I allowed to reuse the same aggregate for multiple commands?
A: Yes, you can, but it is discouraged, since it can lead to the God Aggregate problem.

Q: Am I allowed to reuse code between aggregates?
A: Yes, you can, but it is discouraged, a simple copy+paste could safe you from a lot of headaches in the future. Be
mindful of the [Rule of Three](https://en.wikipedia.org/wiki/Rule_of_three_(computer_programming)). Use the
`MyApp.[Stream Name].[Stream Name]` module as a place to put common code between aggregates.
"""
@spec register_transaction_script(
command_module :: module(),
opts :: [aggregate: module(), lifespan: module()]
) :: Macro.t()
defmacro register_transaction_script(command_module, opts \\ []) do
command_module = Macro.expand(command_module, __CALLER__)

aggregate_module =
opts
|> Keyword.replace_lazy(:aggregate, &Macro.expand(&1, __CALLER__))
|> Keyword.get(:aggregate, Module.concat([command_module, "Aggregate"]))

Code.ensure_compiled!(command_module)
Code.ensure_compiled!(aggregate_module)

opts =
opts
|> Keyword.take([:before_execute, :timeout, :lifespan, :consistency])
|> Keyword.put(:aggregate, aggregate_module)
|> Keyword.put(:to, command_module)
|> Keyword.put(:identity, Kernel.apply(command_module, :aggregate_identifier, []))
|> Keyword.put(:identity_prefix, Kernel.apply(command_module, :stream_prefix, []))
|> Keyword.put_new(:lifespan, OnePiece.Commanded.Aggregate.StatelessLifespan)

quote do
Commanded.Commands.Router.dispatch(unquote(command_module), unquote(opts))
end
end
end
2 changes: 1 addition & 1 deletion apps/one_piece_commanded/mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ defmodule OnePiece.Commanded.MixProject do
use Mix.Project

@app :one_piece_commanded
@version "0.19.1"
@version "0.20.0"
@elixir_version "~> 1.13"
@source_url "https://github.com/straw-hat-team/beam-monorepo"

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
defmodule OnePiece.Commanded.CommandRouterTest do
use ExUnit.Case, async: true

alias TestSupport.CommandRouterExample.{
CommandRouter,
OpenBankAccount
}

@dispatch_opts [
application: TestSupport.DefaultApp,
returning: :execution_result
]

setup do
start_supervised!(TestSupport.DefaultApp)
:ok
end

describe "dispatch_transaction/2" do
test "dispatches a transaction" do
{:ok, result} =
%{uuid: "uuid-1"}
|> OpenBankAccount.new!()
|> CommandRouter.dispatch(@dispatch_opts)

assert result.aggregate_uuid == "bank-account-uuid-1"
assert result.aggregate_state == %OpenBankAccount.Aggregate{uuid: "uuid-1"}
end
end
end
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
defmodule OnePiece.Commanded.StatelessLifespanTest do
use ExUnit.Case, async: true
alias TestSupport.{DepositAccountOpened, MyCommandOne, AccountNumber}
alias TestSupport.{DepositAccountOpened, MyCommandOne}
doctest OnePiece.Commanded.Aggregate.StatelessLifespan
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
defmodule TestSupport.CommandRouterExample.BankAccountOpened do
@moduledoc false
use OnePiece.Commanded.Event, aggregate_identifier: :uuid

embedded_schema do
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
defmodule TestSupport.CommandRouterExample.CommandRouter do
@moduledoc false
use OnePiece.Commanded.CommandRouter
register_transaction_script(TestSupport.CommandRouterExample.OpenBankAccount)
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
defmodule TestSupport.CommandRouterExample.OpenBankAccount.Aggregate do
@moduledoc false
use OnePiece.Commanded.Aggregate, identifier: :uuid
alias TestSupport.CommandRouterExample.BankAccountOpened

embedded_schema do
end

@impl OnePiece.Commanded.Aggregate
def apply(aggregate, %BankAccountOpened{} = event) do
aggregate
|> Map.put(:uuid, event.uuid)
end
end

defmodule TestSupport.CommandRouterExample.OpenBankAccount do
@moduledoc false
use OnePiece.Commanded.CommandHandler

use OnePiece.Commanded.Command,
aggregate_identifier: :uuid,
stream_prefix: "bank-account-"

alias TestSupport.CommandRouterExample.{
BankAccountOpened,
OpenBankAccount
}

embedded_schema do
end

def handle(%OpenBankAccount.Aggregate{} = _aggregate, %OpenBankAccount{} = command) do
%BankAccountOpened{uuid: command.uuid}
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,18 @@ defmodule TestSupport do
end
end

defmodule DefaultApp do
@moduledoc false
use Commanded.Application,
otp_app: :one_piece_commanded,
event_store: [
adapter: Commanded.EventStore.Adapters.InMemory,
serializer: Commanded.Serialization.JsonSerializer
],
pubsub: :local,
registry: :local
end

def errors_on(changeset) do
Ecto.Changeset.traverse_errors(changeset, fn {message, opts} ->
Regex.replace(~r"%{(\w+)}", message, fn _, key ->
Expand Down
Loading