Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

try implement coordinator #1

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,3 @@ config :sasl,
errlog_type: :error

import_config "#{Mix.env()}.exs"

10 changes: 6 additions & 4 deletions lib/etude/kvs.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ defmodule Etude.KVS do
[] ->
nil

[test(value: value)|_] ->
[test(value: value) | _] ->
value
end
end
Expand All @@ -37,10 +37,12 @@ defmodule Etude.KVS do
@spec create_table() :: :test
def create_table do
:ets.new(:test, [
:public, :named_table, :set,
:public,
:named_table,
:set,
{:keypos, test(:key) + 1},
{:write_concurrency, true},
{:read_concurrency, true}]
)
{:read_concurrency, true}
])
end
end
56 changes: 56 additions & 0 deletions lib/etude/stat/read_fsm.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
defmodule Etude.Stat.ReadFSM do
@moduledoc """
The coordinator for the stat get operations.
The key here is to generate the preflist just like in write_fsm and then
query each replica and wait until a quorum is met.
"""

alias :gen_statem, as: GenStatem
alias Etude.Stat.ReadFSMSup
alias Etude.Stat.Utils

@behaviour GenStatem

defmodule Data do
@moduledoc false

defstruct req_id: 0,
from: nil,
client: nil,
stat_name: nil,
preflist: [],
num_read: 0,
replies: []

def new([req_id, from, client, stat_name]) do
%Data{
req_id: req_id,
from: from,
client: client,
stat_name: stat_name
}
end
end

# gen_statem callback functions

@n 3
@r 2

def init(init_args) do
data = Data.new(init_args)
{:ok, :prepare, data, [{:next_event, :internal, :do}]}
end

def prepare(:internal, :do, %Data{} = data0) do
doc_idx = Utils.make_chash_key(data0.client, data0.stat_name)
preflist = :riak_core_apl.get_apl(doc_idx, @n, :etude_stat)
data = %{data0 | preflist: preflist}
{:next_state, :execute, data, [{:next_event, :internal, :do}]}
end

def execute(:internal, :do, %Data{} = data0) do
Etude.Stat.VNode.get(data0.preflist, data0.req_id, data0.stat_name)
{:next_state, :wating, data0}
end
end
32 changes: 32 additions & 0 deletions lib/etude/stat/read_fsm_sup.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
defmodule Etude.Stat.ReadFSMSup do
use Supervisor

alias Etude.Stat.ReadFSM

@read_fsm_spec %{
id: :undefined,
start: {ReadFSM, :start_link, []},
restart: :temporary,
shutdown: 5000,
type: :worker,
modules: [ReadFSM]
}

@sup_flags [
strategy: :simple_one_for_one,
max_restarts: 10,
max_seconds: 10
]

def start_reader(args) do
Supervisor.start_child(__MODULE__, args)
end

def start_link do
Supervisor.start_link(__MODULE__, [], name: __MODULE__)
end

def init(_args) do
Supervisor.init([@read_fsm_spec], @sup_flags)
end
end
12 changes: 12 additions & 0 deletions lib/etude/stat/utils.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
defmodule Etude.Stat.Utils do
@moduledoc false

@spec make_reqid() :: non_neg_integer()
def make_reqid, do: :erlang.phash2(:os.timestamp())

@spec make_chash_key(charlist(), charlist()) :: binary()
def make_chash_key(client, stat_name) do
bkey = {client, stat_name}
:riak_core_util.chash_key(bkey)
end
end
121 changes: 121 additions & 0 deletions lib/etude/stat/write_fsm.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
defmodule Etude.Stat.WriteFSM do
@moduledoc """
The coordinator for stat write operations.
This example will show how to properly replicate data in riak_core by making use
of the _preflist_.
"""

alias :gen_statem, as: GenStatem
alias Etude.Stat.WriteFSMSup
alias Etude.Stat.Utils

@behaviour GenStatem

defmodule Data do
@moduledoc false

defstruct req_id: 0,
from: nil,
client: nil,
stat_name: nil,
op: nil,
value: :undefined,
preflist: [],
num_write: 0

def new([req_id, from, client, stat_name, op, value]) do
%Data{
req_id: req_id,
from: from,
client: client,
stat_name: stat_name,
value: value,
op: op
}
end
end

# API functions

def write(client, stat_name, op),
do: write(client, stat_name, op, :undefind)

def write(client, stat_name, op, value) do
req_id = Utils.make_reqid()

start_args = [
req_id,
self(),
client,
stat_name,
op,
value
]

WriteFSMSup.start_writer(start_args)
end

# gen_statem callback functions

@n 3
@w 2

def callback_mode, do: :state_functions

def start_link(req_id, from, client, stat_name, op),
do: start_link(req_id, from, client, stat_name, op, :undefined)

def start_link(req_id, from, client, stat_name, op, value),
do: GenStatem.start_link(__MODULE__, [req_id, from, client, stat_name, op, value], [])

def init(init_args) do
data = Data.new(init_args)
{:ok, :prepare, data, [{:next_event, :internal, :do}]}
end

def prepare(:internal, :do, %Data{client: client, stat_name: stat_name} = data0) do
doc_idx = Utils.make_chash_key(client, stat_name)
preflist = :riak_core_apl.get_apl(doc_idx, @n, :etude_stat)
data = %{data0 | preflist: preflist}
{:next_state, :execute, data, [{:next_event, :internal, :do}]}
end

def execute(:internal, :do, %Data{} = data0) do
_ =
do_execute(
data0.preflist,
data0.req_id,
data0.stat_name,
data0.op,
data0.value
)

{:next_state, :wating, data0}
end

def wating(:info, {:ok, req_id}, %Data{} = data0),
do: handle_reply_from_vnode(req_id, %{data0 | num_write: data0.num_w + 1})

# private functions

@spec do_execute(
preflist :: :riak_core_apl.preflist2(),
req_id :: pos_integer(),
stat_name :: charlist(),
op :: atom(),
value :: :undefined | term()
) :: term()
defp do_execute(preflist, req_id, stat_name, op, :undefined),
do: apply(Etude.Stat.Vnode, op, [preflist, req_id, stat_name])

defp do_execute(preflist, req_id, stat_name, op, value),
do: apply(Etude.Stat.Vnode, op, [preflist, req_id, stat_name, value])

defp handle_reply_from_vnode(req_id, %Data{num_write: num_write} = data)
when num_write == @w do
:ok = Process.send(data.from, {req_id, :ok}, [])
{:stop, :normal, data}
end

defp handle_reply_from_vnode(_, data), do: {:keep_state, data}
end
32 changes: 32 additions & 0 deletions lib/etude/stat/write_fsm_sup.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
defmodule Etude.Stat.WriteFSMSup do
use Supervisor

alias Etude.Stat.WriteFSM

@write_fsm_spec %{
id: :undefined,
start: {WriteFSM, :start_link, []},
restart: :temporary,
shutdown: 5000,
type: :worker,
modules: [WriteFSM]
}

@sup_flags [
strategy: :simple_one_for_one,
max_restarts: 10,
max_seconds: 10
]

def start_writer(args) do
Supervisor.start_child(__MODULE__, args)
end

def start_link do
Supervisor.start_link(__MODULE__, [], name: __MODULE__)
end

def init(_args) do
Supervisor.init([@write_fsm_spec], @sup_flags)
end
end
56 changes: 39 additions & 17 deletions lib/etude/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,50 @@ defmodule Etude.Supervisor do
Top level supervisor
"""

@vnode_spec %{
id: Etude.VNode_master,
start: {:riak_core_vnode_master, :start_link, [Etude.VNode]},
restart: :permanent,
shutdown: 5000,
type: :worker,
modules: [:riak_core_vnode_master]
}

@write_sup_spec %{
id: Etude.Stat.WriteFSMSup,
start: {Etude.Stat.WriteFSMSup, :start_link, []},
restart: :permanent,
timeout: :infinity,
type: :supervisor,
modules: [Etude.Stat.WriteFSMSup]
}

@read_sup_spec %{
id: Etude.Stat.ReadFSMSup,
start: {Etude.Stat.ReadFSMSup, :start_link, []},
restart: :permanent,
timeout: :infinity,
type: :supervisor,
modules: [Etude.Stat.ReadFSMSup]
}

@sup_flags [
strategy: :one_for_one,
max_restarts: 5,
max_seconds: 10
]

def start_link do
Supervisor.start_link(__MODULE__, [], name: __MODULE__)
end

def init(_args) do
vnode_spec = {
Etude.VNode,
{:riak_core_vnode_master, :start_link, [Etude.VNode]},
:permanent,
5000,
:worker,
[:riak_core_vnode_master]
}

children = [vnode_spec]
children = [
@vnode_spec,
@write_sup_spec,
@read_sup_spec
]

Supervisor.init(
children,
strategy: :one_for_all,
max_restarts: 0,
max_seconds: 1,
name: __MODULE__
)
Supervisor.init(children, @sup_flags)
end
end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ defmodule Etude.MixProject do
{:cuttlefish,
github: "rabbitmq/cuttlefish", branch: "develop", manager: :rebar3, override: true},
{:goldrush, github: "DeadZen/goldrush", tag: "0.1.9", manager: :rebar3, override: true},
{:lager, github: "erlang-lager/lager", override: true},
{:lager, github: "erlang-lager/lager", override: true}
]
end
end