Skip to content

Commit

Permalink
Vectorclock abstraction (#370)
Browse files Browse the repository at this point in the history
* Work in progress

Rewriting the meta data sender module abstract away from dicts

* wip test fixture

* wip:tests

* wip

* wip

* Unit tests now passing!

* Renaming of vectorclock functions

* wip: Fixing the calls

* Fixing systests

* Fixing test fixture + cleaning up

* refactoring

* Adding documentation and further simplifications

* Refactorings

* Update src/stable_time_functions.erl

Co-Authored-By: Peter Zeller <p_zeller@cs.uni-kl.de>

* Update src/meta_data_sender.erl

Co-Authored-By: Peter Zeller <p_zeller@cs.uni-kl.de>

*  adaptions to vectorclock 0.2.0

* Some bug fixes

* Fixing the test fixture

* Upgrade vector clock dependency

* Bumping version
  • Loading branch information
bieniusa authored Jun 13, 2019
1 parent b7a6481 commit 4cf08fc
Show file tree
Hide file tree
Showing 23 changed files with 406 additions and 684 deletions.
16 changes: 8 additions & 8 deletions rebar.lock
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@
{ref,"573d583930c4b1134e504bec83926d188112b401"}},
0},
{<<"exometer_core">>,{pkg,<<"basho_exometer_core">>,<<"1.0.5">>},1},
{<<"folsom">>,{pkg,<<"folsom">>,<<"0.8.7">>},1},
{<<"folsom">>,{pkg,<<"folsom">>,<<"0.8.8">>},1},
{<<"gen_fsm_compat">>,{pkg,<<"gen_fsm_compat">>,<<"0.3.0">>},1},
{<<"getopt">>,{pkg,<<"getopt">>,<<"1.0.1">>},2},
{<<"goldrush">>,{pkg,<<"goldrush">>,<<"0.1.9">>},1},
{<<"lager">>,{pkg,<<"lager">>,<<"3.6.9">>},0},
{<<"lager">>,{pkg,<<"lager">>,<<"3.7.0">>},0},
{<<"parse_trans">>,{pkg,<<"parse_trans">>,<<"3.3.0">>},2},
{<<"pbkdf2">>,{pkg,<<"pbkdf2">>,<<"2.0.0">>},1},
{<<"poolboy">>,{pkg,<<"basho_poolboy">>,<<"0.8.4">>},1},
{<<"prometheus">>,{pkg,<<"prometheus">>,<<"4.2.2">>},0},
{<<"prometheus">>,{pkg,<<"prometheus">>,<<"4.4.0">>},0},
{<<"prometheus_process_collector">>,
{pkg,<<"prometheus_process_collector">>,<<"1.4.3">>},
0},
Expand All @@ -35,7 +35,7 @@
{<<"riak_ensemble">>,{pkg,<<"riak_ensemble_ng">>,<<"2.4.4">>},1},
{<<"riak_sysmon">>,{pkg,<<"riak_sysmon">>,<<"2.1.7">>},1},
{<<"setup">>,{pkg,<<"setup">>,<<"2.0.2">>},2},
{<<"vectorclock">>,{pkg,<<"vectorclock">>,<<"0.1.0">>},0}]}.
{<<"vectorclock">>,{pkg,<<"vectorclock">>,<<"0.2.0">>},0}]}.
[
{pkg_hash,[
{<<"accept">>, <<"2505B60BCB992CA79BD03AB7B8FEC8A520A47D9730F286DF1A479CC98B03F94B">>},
Expand All @@ -52,21 +52,21 @@
{<<"elli">>, <<"7842861819869EBBFF7230BC77ECF2DF551AE3EAEF5FDE6B01A7561CACCB811E">>},
{<<"elli_prometheus">>, <<"FF41EA8D88D1EBD1CD7A6D43FCC02B33B47FF20272C097B9D3A3CCCD79980C05">>},
{<<"exometer_core">>, <<"8B37B9B07815B964E67C8D789D823B57E10DAF375EC146016527AECFC13D03C7">>},
{<<"folsom">>, <<"A885F0AEEE4C84270954C88A55A5A473D6B2C7493E32FFDC5765412DD555A951">>},
{<<"folsom">>, <<"9A2B02010F6727CB1948EF34E21CB66F3554B63355C3A31E8BCB10FB172C3170">>},
{<<"gen_fsm_compat">>, <<"5903549F67D595F58A7101154CBE0FDD46955FBFBE40813F1E53C23A970FF5F4">>},
{<<"getopt">>, <<"C73A9FA687B217F2FF79F68A3B637711BB1936E712B521D8CE466B29CBF7808A">>},
{<<"goldrush">>, <<"F06E5D5F1277DA5C413E84D5A2924174182FB108DABB39D5EC548B27424CD106">>},
{<<"lager">>, <<"387BCD836DC0C8AD9C6D90A0E0CE5B29676847950CBC527BCCC194A02028DE8E">>},
{<<"lager">>, <<"563AB17CD32134A3DD17EC3B3622E6D8F827506AA4F8C489158879BED87D980B">>},
{<<"parse_trans">>, <<"09765507A3C7590A784615CFD421D101AEC25098D50B89D7AA1D66646BC571C1">>},
{<<"pbkdf2">>, <<"11C23279FDED5C0027AB3996CFAE77805521D7EF4BABDE2BD7EC04A9086CF499">>},
{<<"poolboy">>, <<"45C306FF1C9F6451730DD21642EDF55FA72EBD5E2FE4A38D8D8A56B8EA21A256">>},
{<<"prometheus">>, <<"A830E77B79DC6D28183F4DB050A7CAC926A6C58F1872F9EF94A35CD989ACEEF8">>},
{<<"prometheus">>, <<"6DCB11FC80FAF873CB2297664720414AED16B0A4FC3A1A15AE538D66F84CCC34">>},
{<<"prometheus_process_collector">>, <<"657386E8F142FC817347D95C1F3A05AB08710F7DF9E7F86DB6FACAED107ED929">>},
{<<"ranch">>, <<"6B1FAB51B49196860B733A49C07604465A47BDB78AA10C1C16A3D199F7F8C881">>},
{<<"rand_compat">>, <<"011646BC1F0B0C432FE101B816F25B9BBB74A085713CEE1DAFD2D62E9415EAD3">>},
{<<"riak_core">>, <<"098CEEF293F9F232E724DD8745916F2E4CE0F14CB8EDD1653E8425A14F0DD0B6">>},
{<<"riak_ensemble">>, <<"F9E04052F4A7FAAD20F008DFF18D34D3552513000410CE9C5941B4F7361741E8">>},
{<<"riak_sysmon">>, <<"AF420DF0F7569E1F12BCD465745164CB6189EB93F118D5CDB3F90FEB3F8BF47D">>},
{<<"setup">>, <<"1203F4CDA11306C2E34434244576DED0A7BBFB0908D9A572356C809BD0CDF085">>},
{<<"vectorclock">>, <<"CBC79A44193756CF20B9AC8DDB9F268FAF1FE84D00C396DB447373E6D1FEDFC5">>}]}
{<<"vectorclock">>, <<"6C4A9D44895F51BB99910DBE31FC691BF05FA6B2BF84986F6E3BDE4BD18F6CBA">>}]}
].
2 changes: 1 addition & 1 deletion src/antidote.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
{application, antidote,
[
{description, "A transactional CRDT database"},
{vsn, "0.0.2"},
{vsn, "0.2.0"},
{registered, []},
{applications, [
kernel,
Expand Down
2 changes: 1 addition & 1 deletion src/antidote_app.erl
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ start(_StartType, _StartArgs) ->
case application:get_env(antidote, auto_start_read_servers) of
{ok, true} ->
%% start read servers
inter_dc_manager:start_bg_processes(stable);
inter_dc_manager:start_bg_processes(stable_time_functions);
_->
ok %dont_start_read_servers
end,
Expand Down
2 changes: 1 addition & 1 deletion src/antidote_stats_collector.erl
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ init_metrics() ->
calculate_staleness() ->
{ok, SS} = dc_utilities:get_stable_snapshot(),
CurrentClock = to_microsec(os:timestamp()),
Staleness = dict:fold(fun(_K, C, Max) ->
Staleness = vectorclock:fold(fun(_K, C, Max) ->
max(CurrentClock - C, Max)
end, 0, SS),
round(Staleness/(1000)). %% To millisecs
Expand Down
4 changes: 2 additions & 2 deletions src/antidote_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,12 @@ init(_Args) ->


MetaDataManagerSup = {meta_data_manager_sup,
{meta_data_manager_sup, start_link, [stable]},
{meta_data_manager_sup, start_link, [stable_time_functions]},
permanent, 5000, supervisor,
[meta_data_manager_sup]},

MetaDataSenderSup = {meta_data_sender_sup,
{meta_data_sender_sup, start_link, [stable_time_functions:export_funcs_and_vals()]},
{meta_data_sender_sup, start_link, [[stable_time_functions]]},
permanent, 5000, supervisor,
[meta_data_sender_sup]},

Expand Down
15 changes: 8 additions & 7 deletions src/clocksi_interactive_coord.erl
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ perform_singleitem_update(Clock, Key, Type, Params, Properties) ->
TxId = Transaction#transaction.txn_id,
DcId = ?DC_META_UTIL:get_my_dc_id(),

CausalClock = ?VECTORCLOCK:set_clock_of_dc(
CausalClock = ?VECTORCLOCK:set(
DcId,
CommitTime,
Transaction#transaction.vec_snapshot_time
Expand Down Expand Up @@ -687,7 +687,7 @@ create_transaction_record(ClientClock, StayAlive, From, _IsStatic, Properties) -
end
end,
DcId = ?DC_META_UTIL:get_my_dc_id(),
LocalClock = ?VECTORCLOCK:get_clock_of_dc(DcId, SnapshotTime),
LocalClock = ?VECTORCLOCK:get(DcId, SnapshotTime),
Name = case StayAlive of
true ->
generate_name(From);
Expand Down Expand Up @@ -837,7 +837,7 @@ reply_to_client(State = #coord_state{
_Result = execute_post_commit_hooks(ClientOps),
%% TODO: What happens if commit hook fails?
DcId = ?DC_META_UTIL:get_my_dc_id(),
CausalClock = ?VECTORCLOCK:set_clock_of_dc(DcId, CommitTime, Transaction#transaction.vec_snapshot_time),
CausalClock = ?VECTORCLOCK:set(DcId, CommitTime, Transaction#transaction.vec_snapshot_time),
case IsStatic of
false ->
{ok, {TxId, CausalClock}};
Expand Down Expand Up @@ -908,7 +908,7 @@ get_snapshot_time() ->
Now = dc_utilities:now_microsec() - ?OLD_SS_MICROSEC,
{ok, VecSnapshotTime} = ?DC_UTIL:get_stable_snapshot(),
DcId = ?DC_META_UTIL:get_my_dc_id(),
SnapshotTime = vectorclock:set_clock_of_dc(DcId, Now, VecSnapshotTime),
SnapshotTime = vectorclock:set(DcId, Now, VecSnapshotTime),
{ok, SnapshotTime}.


Expand All @@ -921,6 +921,7 @@ wait_for_clock(Clock) ->
{ok, VecSnapshotTime};
false ->
%% wait for snapshot time to catch up with Client Clock
%TODO Refactor into constant
timer:sleep(10),
wait_for_clock(Clock)
end.
Expand Down Expand Up @@ -1253,13 +1254,13 @@ downstream_fail_test(Pid) ->

get_snapshot_time_test() ->
{ok, SnapshotTime} = get_snapshot_time(),
?assertMatch([{mock_dc, _}], dict:to_list(SnapshotTime)).
?assertMatch([{mock_dc, _}], vectorclock:to_list(SnapshotTime)).

wait_for_clock_test() ->
{ok, SnapshotTime} = wait_for_clock(vectorclock:from_list([{mock_dc, 10}])),
?assertMatch([{mock_dc, _}], dict:to_list(SnapshotTime)),
?assertMatch([{mock_dc, _}], vectorclock:to_list(SnapshotTime)),
VecClock = dc_utilities:now_microsec(),
{ok, SnapshotTime2} = wait_for_clock(vectorclock:from_list([{mock_dc, VecClock}])),
?assertMatch([{mock_dc, _}], dict:to_list(SnapshotTime2)).
?assertMatch([{mock_dc, _}], vectorclock:to_list(SnapshotTime2)).

-endif.
33 changes: 10 additions & 23 deletions src/clocksi_materializer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -216,12 +216,12 @@ materialize_intern_perform(Type, OpList, LastOp, FirstHole, SnapshotCommitTime,
is_op_in_snapshot(TxId, Op, {OpDc, OpCommitTime}, OperationSnapshotTime, SnapshotTime, LastSnapshot, PrevTime) ->
%% First check if the op was already included in the previous snapshot
%% Is the "or TxId ==" part necessary and correct????
case materializer:belongs_to_snapshot_op(
LastSnapshot, {OpDc, OpCommitTime}, OperationSnapshotTime) or (TxId == Op#clocksi_payload.txid) of
case materializer:belongs_to_snapshot_op(LastSnapshot, {OpDc, OpCommitTime}, OperationSnapshotTime)
orelse (TxId == Op#clocksi_payload.txid) of
true ->
%% If not, check if it should be included in the new snapshot
%% Replace the snapshot time of the dc where the transaction committed with the commit time
OpSSCommit = dict:store(OpDc, OpCommitTime, OperationSnapshotTime),
OpSSCommit = vectorclock:set(OpDc, OpCommitTime, OperationSnapshotTime),
%% PrevTime2 is the time of the previous snapshot, if there was none, it usues the snapshot time
%% of the new operation
PrevTime2 = case PrevTime of
Expand All @@ -232,30 +232,17 @@ is_op_in_snapshot(TxId, Op, {OpDc, OpCommitTime}, OperationSnapshotTime, Snapsho
end,
%% Result is true if the op should be included in the snapshot
%% NewTime is the vectorclock of the snapshot with the time of Op included
{Result, NewTime} =
dict:fold(fun(DcIdOp, TimeOp, {Acc, PrevTime3}) ->
Res1 = case dict:find(DcIdOp, SnapshotTime) of
{ok, TimeSS} ->
case TimeSS < TimeOp of
{Result, NewTime} = vectorclock:fold(fun(DcIdOp, TimeOp, {Acc, PrevTime3}) ->
TimeSS = vectorclock:get(DcIdOp, SnapshotTime),
Res1 = case TimeSS < TimeOp of
true ->
false;
false ->
Acc
end;
error ->
logger:error("Could not find DC in SS ~p", [SnapshotTime]),
false
end,
Res2 = dict:update(DcIdOp, fun(Val) ->
case TimeOp > Val of
true ->
TimeOp;
false ->
Val
end
end, TimeOp, PrevTime3),
{Res1, Res2}
end, {true, PrevTime2}, OpSSCommit),
end,
Res2 = vectorclock:update_with(DcIdOp, fun(Val) -> max(TimeOp, Val) end, TimeOp, PrevTime3),
{Res1, Res2}
end, {true, PrevTime2}, OpSSCommit),
case Result of
true ->
{true, false, NewTime};
Expand Down
6 changes: 3 additions & 3 deletions src/cure.erl
Original file line number Diff line number Diff line change
Expand Up @@ -236,14 +236,14 @@ gr_snapshot_obtain(ClientClock, Objects, StateOrValue) ->
%% VST = vector stable time with entries for each dc
{ok, GST, VST} = dc_utilities:get_scalar_stable_time(),
DcId = dc_meta_data_utilities:get_my_dc_id(),
Dt = vectorclock:get_clock_of_dc(DcId, ClientClock),
Dt = vectorclock:get(DcId, ClientClock),
case Dt =< GST of
true ->
%% Set all entries in snapshot as GST
ST = dict:map(fun(_, _) -> GST end, VST),
ST = vectorclock:set_all(GST, VST),
%% ST doesnot contain entry for local dc, hence explicitly
%% add it in snapshot time
SnapshotTime = vectorclock:set_clock_of_dc(DcId, GST, ST),
SnapshotTime = vectorclock:set(DcId, GST, ST),
{ok, TxId} = clocksi_istart_tx(SnapshotTime, [{update_clock, false}], false),
case obtain_objects(Objects, TxId, StateOrValue) of
{ok, Res} ->
Expand Down
52 changes: 20 additions & 32 deletions src/dc_utilities.erl
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ ensure_all_vnodes_running(VnodeType) ->
true -> ok;
false ->
logger:debug("Waiting for vnode ~p: required ~p, spawned ~p", [VnodeType, Partitions, Running]),
%TODO: Extract into configuration constant
timer:sleep(250),
ensure_all_vnodes_running(VnodeType)
end.
Expand All @@ -197,6 +198,7 @@ bcast_vnode_check_up(VMaster, Request, [P|Rest]) ->
case Err of
true ->
logger:debug("Vnode not up retrying, ~p, ~p", [VMaster, P]),
%TODO: Extract into configuration constant
timer:sleep(1000),
bcast_vnode_check_up(VMaster, Request, [P|Rest]);
false ->
Expand All @@ -217,16 +219,16 @@ ensure_all_vnodes_running_master(VnodeType) ->
check_registered(VnodeType),
bcast_vnode_check_up(VnodeType, {hello}, get_all_partitions()).

%% Prints to the console the staleness between this DC and all
%% Prints to the logging framework the staleness between this DC and all
%% other DCs that it is connected to
-spec check_staleness() -> ok.
check_staleness() ->
Now = dc_utilities:now_microsec(),
{ok, SS} = get_stable_snapshot(),
dict:fold(fun(DcId, Time, _Acc) ->
io:format("~w staleness: ~w ms ~n", [DcId, (Now-Time)/1000]),
ok
end, ok, SS).
PrintFun = fun(DcId, Time) ->
logger:debug("~w staleness: ~w ms ~n", [DcId, (Now-Time)/1000]) end,
_ = vectorclock:map(PrintFun, SS),
ok.

%% Loops until a process with the given name is registered locally
-spec check_registered(atom()) -> ok.
Expand All @@ -245,9 +247,10 @@ check_registered(Name) ->
%% in all partitions
-spec get_stable_snapshot() -> {ok, snapshot_time()}.
get_stable_snapshot() ->
case meta_data_sender:get_merged_data(stable) of
case meta_data_sender:get_merged_data(stable_time_functions, vectorclock:new()) of
undefined ->
%% The snapshot isn't realy yet, need to wait for startup
%% The snapshot isn't ready yet, need to wait for startup
%TODO: Extract into configuration constant
timer:sleep(10),
get_stable_snapshot();
SS ->
Expand All @@ -260,27 +263,20 @@ get_stable_snapshot() ->
%% For gentlerain use the same format as clocksi
%% But, replicate GST to all entries in the dict
StableSnapshot = SS,
case dict:size(StableSnapshot) of
case vectorclock:size(StableSnapshot) of
0 ->
{ok, StableSnapshot};
_ ->
ListTime = dict:fold(
fun(_Key, Value, Acc) ->
[Value | Acc ]
end, [], StableSnapshot),
GST = lists:min(ListTime),
{ok, dict:map(
fun(_K, _V) ->
GST
end,
StableSnapshot)}
DCs = dc_meta_data_utilities:get_dc_ids(true),
GST = vectorclock:min_clock(StableSnapshot, DCs),
{ok, vectorclock:set_all(GST, StableSnapshot)}
end
end
end.

-spec get_partition_snapshot(partition_id()) -> snapshot_time().
get_partition_snapshot(Partition) ->
case meta_data_sender:get_meta_dict(stable, Partition) of
case meta_data_sender:get_meta(stable_time_functions, Partition, vectorclock:new()) of
undefined ->
%% The partition isn't ready yet, wait for startup
timer:sleep(10),
Expand All @@ -291,12 +287,10 @@ get_partition_snapshot(Partition) ->

%% Returns the minimum value in the stable vector snapshot time
%% Useful for gentlerain protocol.
-spec get_scalar_stable_time() -> {ok, non_neg_integer(), vectorclock()}.
-spec get_scalar_stable_time() -> {ok, pos_integer(), vectorclock()}.
get_scalar_stable_time() ->
{ok, StableSnapshot} = get_stable_snapshot(),
%% dict:is_empty/1 is not available, hence using dict:size/1
%% to check whether it is empty
case dict:size(StableSnapshot) of
case vectorclock:size(StableSnapshot) of
0 ->
%% This case occur when updates from remote replicas has not yet received
%% or when there are no remote replicas
Expand All @@ -305,15 +299,9 @@ get_scalar_stable_time() ->
Now = dc_utilities:now_microsec() - ?OLD_SS_MICROSEC,
{ok, Now, StableSnapshot};
_ ->
%% This is correct only if stablesnapshot has entries for
%% all DCs. Inorder to check that we need to configure the
%% number of DCs in advance, which is not possible now.
ListTime = dict:fold(
fun(_Key, Value, Acc) ->
[Value | Acc ]
end, [], StableSnapshot),
GST = lists:min(ListTime),
{ok, GST, StableSnapshot}
DCs = dc_meta_data_utilites:get_dc_ids(true),
GST = vectorclock:min_clock(StableSnapshot, DCs),
{ok, GST, vectorclock:set_all(GST, StableSnapshot)}
end.

%% Loops until a process with the given name is registered globally
Expand Down
Loading

0 comments on commit 4cf08fc

Please sign in to comment.