diff --git a/config/config.exs b/config/config.exs index dcda6e7..d6e86fb 100644 --- a/config/config.exs +++ b/config/config.exs @@ -12,4 +12,3 @@ config :sasl, errlog_type: :error import_config "#{Mix.env()}.exs" - diff --git a/lib/etude/kvs.ex b/lib/etude/kvs.ex index 981b7ce..394b945 100644 --- a/lib/etude/kvs.ex +++ b/lib/etude/kvs.ex @@ -19,7 +19,7 @@ defmodule Etude.KVS do [] -> nil - [test(value: value)|_] -> + [test(value: value) | _] -> value end end @@ -37,10 +37,12 @@ defmodule Etude.KVS do @spec create_table() :: :test def create_table do :ets.new(:test, [ - :public, :named_table, :set, + :public, + :named_table, + :set, {:keypos, test(:key) + 1}, {:write_concurrency, true}, - {:read_concurrency, true}] - ) + {:read_concurrency, true} + ]) end end diff --git a/lib/etude/stat/read_fsm.ex b/lib/etude/stat/read_fsm.ex new file mode 100644 index 0000000..f1fdf06 --- /dev/null +++ b/lib/etude/stat/read_fsm.ex @@ -0,0 +1,56 @@ +defmodule Etude.Stat.ReadFSM do + @moduledoc """ + The coordinator for the stat get operations. + The key here is to generate the preflist just like in write_fsm and then + query each replica and wait until a quorum is met. + """ + + alias :gen_statem, as: GenStatem + alias Etude.Stat.ReadFSMSup + alias Etude.Stat.Utils + + @behaviour GenStatem + + defmodule Data do + @moduledoc false + + defstruct req_id: 0, + from: nil, + client: nil, + stat_name: nil, + preflist: [], + num_read: 0, + replies: [] + + def new([req_id, from, client, stat_name]) do + %Data{ + req_id: req_id, + from: from, + client: client, + stat_name: stat_name + } + end + end + + # gen_statem callback functions + + @n 3 + @r 2 + + def init(init_args) do + data = Data.new(init_args) + {:ok, :prepare, data, [{:next_event, :internal, :do}]} + end + + def prepare(:internal, :do, %Data{} = data0) do + doc_idx = Utils.make_chash_key(data0.client, data0.stat_name) + preflist = :riak_core_apl.get_apl(doc_idx, @n, :etude_stat) + data = %{data0 | preflist: preflist} + {:next_state, :execute, data, [{:next_event, :internal, :do}]} + end + + def execute(:internal, :do, %Data{} = data0) do + Etude.Stat.VNode.get(data0.preflist, data0.req_id, data0.stat_name) + {:next_state, :wating, data0} + end +end diff --git a/lib/etude/stat/read_fsm_sup.ex b/lib/etude/stat/read_fsm_sup.ex new file mode 100644 index 0000000..18a1028 --- /dev/null +++ b/lib/etude/stat/read_fsm_sup.ex @@ -0,0 +1,32 @@ +defmodule Etude.Stat.ReadFSMSup do + use Supervisor + + alias Etude.Stat.ReadFSM + + @read_fsm_spec %{ + id: :undefined, + start: {ReadFSM, :start_link, []}, + restart: :temporary, + shutdown: 5000, + type: :worker, + modules: [ReadFSM] + } + + @sup_flags [ + strategy: :simple_one_for_one, + max_restarts: 10, + max_seconds: 10 + ] + + def start_reader(args) do + Supervisor.start_child(__MODULE__, args) + end + + def start_link do + Supervisor.start_link(__MODULE__, [], name: __MODULE__) + end + + def init(_args) do + Supervisor.init([@read_fsm_spec], @sup_flags) + end +end diff --git a/lib/etude/stat/utils.ex b/lib/etude/stat/utils.ex new file mode 100644 index 0000000..98782fe --- /dev/null +++ b/lib/etude/stat/utils.ex @@ -0,0 +1,12 @@ +defmodule Etude.Stat.Utils do + @moduledoc false + + @spec make_reqid() :: non_neg_integer() + def make_reqid, do: :erlang.phash2(:os.timestamp()) + + @spec make_chash_key(charlist(), charlist()) :: binary() + def make_chash_key(client, stat_name) do + bkey = {client, stat_name} + :riak_core_util.chash_key(bkey) + end +end diff --git a/lib/etude/stat/write_fsm.ex b/lib/etude/stat/write_fsm.ex new file mode 100644 index 0000000..199e015 --- /dev/null +++ b/lib/etude/stat/write_fsm.ex @@ -0,0 +1,121 @@ +defmodule Etude.Stat.WriteFSM do + @moduledoc """ + The coordinator for stat write operations. + This example will show how to properly replicate data in riak_core by making use + of the _preflist_. + """ + + alias :gen_statem, as: GenStatem + alias Etude.Stat.WriteFSMSup + alias Etude.Stat.Utils + + @behaviour GenStatem + + defmodule Data do + @moduledoc false + + defstruct req_id: 0, + from: nil, + client: nil, + stat_name: nil, + op: nil, + value: :undefined, + preflist: [], + num_write: 0 + + def new([req_id, from, client, stat_name, op, value]) do + %Data{ + req_id: req_id, + from: from, + client: client, + stat_name: stat_name, + value: value, + op: op + } + end + end + + # API functions + + def write(client, stat_name, op), + do: write(client, stat_name, op, :undefind) + + def write(client, stat_name, op, value) do + req_id = Utils.make_reqid() + + start_args = [ + req_id, + self(), + client, + stat_name, + op, + value + ] + + WriteFSMSup.start_writer(start_args) + end + + # gen_statem callback functions + + @n 3 + @w 2 + + def callback_mode, do: :state_functions + + def start_link(req_id, from, client, stat_name, op), + do: start_link(req_id, from, client, stat_name, op, :undefined) + + def start_link(req_id, from, client, stat_name, op, value), + do: GenStatem.start_link(__MODULE__, [req_id, from, client, stat_name, op, value], []) + + def init(init_args) do + data = Data.new(init_args) + {:ok, :prepare, data, [{:next_event, :internal, :do}]} + end + + def prepare(:internal, :do, %Data{client: client, stat_name: stat_name} = data0) do + doc_idx = Utils.make_chash_key(client, stat_name) + preflist = :riak_core_apl.get_apl(doc_idx, @n, :etude_stat) + data = %{data0 | preflist: preflist} + {:next_state, :execute, data, [{:next_event, :internal, :do}]} + end + + def execute(:internal, :do, %Data{} = data0) do + _ = + do_execute( + data0.preflist, + data0.req_id, + data0.stat_name, + data0.op, + data0.value + ) + + {:next_state, :wating, data0} + end + + def wating(:info, {:ok, req_id}, %Data{} = data0), + do: handle_reply_from_vnode(req_id, %{data0 | num_write: data0.num_w + 1}) + + # private functions + + @spec do_execute( + preflist :: :riak_core_apl.preflist2(), + req_id :: pos_integer(), + stat_name :: charlist(), + op :: atom(), + value :: :undefined | term() + ) :: term() + defp do_execute(preflist, req_id, stat_name, op, :undefined), + do: apply(Etude.Stat.Vnode, op, [preflist, req_id, stat_name]) + + defp do_execute(preflist, req_id, stat_name, op, value), + do: apply(Etude.Stat.Vnode, op, [preflist, req_id, stat_name, value]) + + defp handle_reply_from_vnode(req_id, %Data{num_write: num_write} = data) + when num_write == @w do + :ok = Process.send(data.from, {req_id, :ok}, []) + {:stop, :normal, data} + end + + defp handle_reply_from_vnode(_, data), do: {:keep_state, data} +end diff --git a/lib/etude/stat/write_fsm_sup.ex b/lib/etude/stat/write_fsm_sup.ex new file mode 100644 index 0000000..2f0e483 --- /dev/null +++ b/lib/etude/stat/write_fsm_sup.ex @@ -0,0 +1,32 @@ +defmodule Etude.Stat.WriteFSMSup do + use Supervisor + + alias Etude.Stat.WriteFSM + + @write_fsm_spec %{ + id: :undefined, + start: {WriteFSM, :start_link, []}, + restart: :temporary, + shutdown: 5000, + type: :worker, + modules: [WriteFSM] + } + + @sup_flags [ + strategy: :simple_one_for_one, + max_restarts: 10, + max_seconds: 10 + ] + + def start_writer(args) do + Supervisor.start_child(__MODULE__, args) + end + + def start_link do + Supervisor.start_link(__MODULE__, [], name: __MODULE__) + end + + def init(_args) do + Supervisor.init([@write_fsm_spec], @sup_flags) + end +end diff --git a/lib/etude/supervisor.ex b/lib/etude/supervisor.ex index 40b54b6..27f4566 100644 --- a/lib/etude/supervisor.ex +++ b/lib/etude/supervisor.ex @@ -3,28 +3,50 @@ defmodule Etude.Supervisor do Top level supervisor """ + @vnode_spec %{ + id: Etude.VNode_master, + start: {:riak_core_vnode_master, :start_link, [Etude.VNode]}, + restart: :permanent, + shutdown: 5000, + type: :worker, + modules: [:riak_core_vnode_master] + } + + @write_sup_spec %{ + id: Etude.Stat.WriteFSMSup, + start: {Etude.Stat.WriteFSMSup, :start_link, []}, + restart: :permanent, + timeout: :infinity, + type: :supervisor, + modules: [Etude.Stat.WriteFSMSup] + } + + @read_sup_spec %{ + id: Etude.Stat.ReadFSMSup, + start: {Etude.Stat.ReadFSMSup, :start_link, []}, + restart: :permanent, + timeout: :infinity, + type: :supervisor, + modules: [Etude.Stat.ReadFSMSup] + } + + @sup_flags [ + strategy: :one_for_one, + max_restarts: 5, + max_seconds: 10 + ] + def start_link do Supervisor.start_link(__MODULE__, [], name: __MODULE__) end def init(_args) do - vnode_spec = { - Etude.VNode, - {:riak_core_vnode_master, :start_link, [Etude.VNode]}, - :permanent, - 5000, - :worker, - [:riak_core_vnode_master] - } - - children = [vnode_spec] + children = [ + @vnode_spec, + @write_sup_spec, + @read_sup_spec + ] - Supervisor.init( - children, - strategy: :one_for_all, - max_restarts: 0, - max_seconds: 1, - name: __MODULE__ - ) + Supervisor.init(children, @sup_flags) end end diff --git a/mix.exs b/mix.exs index 8c53d0a..55d1478 100644 --- a/mix.exs +++ b/mix.exs @@ -27,7 +27,7 @@ defmodule Etude.MixProject do {:cuttlefish, github: "rabbitmq/cuttlefish", branch: "develop", manager: :rebar3, override: true}, {:goldrush, github: "DeadZen/goldrush", tag: "0.1.9", manager: :rebar3, override: true}, - {:lager, github: "erlang-lager/lager", override: true}, + {:lager, github: "erlang-lager/lager", override: true} ] end end