diff --git a/include/antidote.hrl b/include/antidote.hrl index fd1555619..36aae9cdb 100644 --- a/include/antidote.hrl +++ b/include/antidote.hrl @@ -100,6 +100,7 @@ commit_time :: dc_and_commit_time(), snapshot_time :: snapshot_time() }). +-type commit_log_payload() :: #commit_log_payload{}. -record(update_log_payload, { key :: key(), @@ -107,6 +108,7 @@ type :: type(), op :: op() }). +-type update_log_payload() :: #update_log_payload{}. -record(abort_log_payload, {}). @@ -126,6 +128,7 @@ | noop, log_payload :: any_log_payload() }). +-type log_operation() :: #log_operation{}. -record(op_number, { %% TODO 19 undefined is required here, because of the use in inter_dc_log_sender_vnode. @@ -134,16 +137,17 @@ global :: undefined | non_neg_integer(), local :: undefined | non_neg_integer() }). +-type op_number() :: #op_number{}. %% The way records are stored in the log. -record(log_record, { %% The version of the log record, for backwards compatability version :: non_neg_integer(), - op_number :: #op_number{}, - bucket_op_number :: #op_number{}, - log_operation :: #log_operation{} + op_number :: op_number(), + bucket_op_number :: op_number(), + log_operation :: log_operation() }). - +-type log_record() :: #log_record{}. %% Clock SI %% MIN is Used for generating the timeStamp of a new snapshot @@ -174,7 +178,7 @@ last_op_id :: op_num(), value :: snapshot() }). - +-type materialized_snapshot() :: #materialized_snapshot{}. %%--------------------------------------------------------------------- -type actor() :: term(). -type key() :: term(). @@ -245,7 +249,10 @@ bound_object/0, module_name/0, function_name/0, - clocksi_payload/0]). + clocksi_payload/0, + materialized_snapshot/0, + snapshot_get_response/0, log_operation/0, log_record/0, op_number/0, + update_log_payload/0, commit_log_payload/0]). %% The record is using during materialization to keep the @@ -256,9 +263,10 @@ %% size of ops_list number_of_ops :: non_neg_integer(), %% the previous snapshot to apply the ops to - materialized_snapshot :: #materialized_snapshot{}, + materialized_snapshot :: materialized_snapshot(), %% The version vector time of the snapshot snapshot_time :: snapshot_time() | ignore, %% true if this is the most recent snapshot in the cache is_newest_snapshot :: boolean() }). +-type snapshot_get_response() :: #snapshot_get_response{}. \ No newline at end of file diff --git a/include/inter_dc_repl.hrl b/include/inter_dc_repl.hrl index 9d8ac7fe8..d9ab2b446 100644 --- a/include/inter_dc_repl.hrl +++ b/include/inter_dc_repl.hrl @@ -1,5 +1,7 @@ -include("antidote_message_types.hrl"). +-export_type([descriptor/0, interdc_txn/0, recvr_state/0, request_cache_entry/0]). + -record(recvr_state, {lastRecvd :: orddict:orddict(), %TODO: this may not be required lastCommitted :: orddict:orddict(), @@ -7,6 +9,7 @@ recQ :: orddict:orddict(), %% Holds recieving updates from each DC separately in causal order. statestore, partition}). +-type recvr_state() :: #recvr_state{}. -type socket_address() :: {inet:ip_address(), inet:port_number()}. -type zmq_socket() :: any(). @@ -16,13 +19,14 @@ -record(interdc_txn, { dcid :: dcid(), partition :: partition_id(), - prev_log_opid :: #op_number{} | none, %% the value is *none* if the transaction is read directly from the log + prev_log_opid :: op_number() | none, %% the value is *none* if the transaction is read directly from the log snapshot :: snapshot_time(), timestamp :: clock_time(), - last_update_opid :: undefined | #op_number{}, %% last opid of the txn that was an update operations (i.e. not a commit/abort) + last_update_opid :: undefined | op_number(), %% last opid of the txn that was an update operations (i.e. not a commit/abort) bucket :: bucket(), - log_records :: [#log_record{}] %% if the OP list is empty, the message is a HEARTBEAT + log_records :: [log_record()] %% if the OP list is empty, the message is a HEARTBEAT }). +-type interdc_txn() :: #interdc_txn{}. -record(descriptor, { dcid :: dcid(), @@ -31,6 +35,8 @@ logreaders :: [socket_address()] }). +-type descriptor() :: #descriptor{}. + %% This keeps information about an inter-dc request that %% is waiting for a reply -record(request_cache_entry, { @@ -40,6 +46,7 @@ pdcid :: pdcid(), binary_req :: binary() }). +-type request_cache_entry() :: #request_cache_entry{}. %% This keeps information about an inter-dc request %% on the site that is performing the query @@ -49,6 +56,7 @@ request_id_num_binary :: binary(), local_pid :: pid() }). +-type inter_dc_query_state() :: #inter_dc_query_state{}. %% State for sub buff -record(inter_dc_sub_buf, { @@ -58,3 +66,4 @@ queue :: queue:queue(), logging_enabled :: boolean() }). +-type inter_dc_sub_buf() :: #inter_dc_sub_buf{}. \ No newline at end of file diff --git a/rebar.config b/rebar.config index d3bc83e00..4fc509707 100644 --- a/rebar.config +++ b/rebar.config @@ -1,4 +1,6 @@ {deps, [ + %% overwrite lager dependency since current version disables logger messages (see https://github.com/erlang-lager/lager/issues/492) + {lager, {git, "https://github.com/erlang-lager/lager", {ref, "411edc71ee9823f6ab9c6f617daccce3c6798a29"}}}, %% riak framework {riak_core, "3.1.1", {pkg,riak_core_ng}}, % ranch socket acceptor pool for managing protocol buffer sockets @@ -12,7 +14,6 @@ antidote_pb_codec, antidotec_pb, vectorclock, - lager, % expose metrics for prometheus as HTTP-API elli, @@ -130,7 +131,7 @@ {profiles,[ {lint, [ - {plugins, [{rebar3_lint, {git, "https://github.com/project-fifo/rebar3_lint.git", {tag, "0.1.2"}}}]} + {plugins, [{rebar3_lint, {git, "https://github.com/project-fifo/rebar3_lint.git", {tag, "v0.1.10"}}}]} ]}, {test, [ {erl_opts, [warnings_as_errors, debug_info, no_inline_list_funcs]}, @@ -151,7 +152,7 @@ {right, "++"}, {left, "++"}]}}, {elvis_style, god_modules, - #{limit => 25, + #{limit => 30, ignore => []}}, {elvis_style, used_ignored_variable}, {elvis_style, no_behavior_info}, @@ -161,11 +162,11 @@ #{regex => "^[a-z]([a-z0-9]*_?)*(_SUITE)?$", ignore => []} }, - { - elvis_style, - function_naming_convention, - #{regex => "^([a-z][a-z0-9]*_?)*$"} - }, + % { + % elvis_style, + % function_naming_convention, + % #{regex => "^([a-z][a-z0-9]*_?)*$"} + % }, {elvis_style, state_record_and_type}, {elvis_style, no_spec_with_records} ] diff --git a/rebar.lock b/rebar.lock index 480e2c9cb..91fd6a58a 100644 --- a/rebar.lock +++ b/rebar.lock @@ -1,8 +1,8 @@ {"1.1.0", [{<<"accept">>,{pkg,<<"accept">>,<<"0.3.0">>},1}, {<<"antidote_crdt">>,{pkg,<<"antidote_crdt">>,<<"0.1.2">>},0}, - {<<"antidote_pb_codec">>,{pkg,<<"antidote_pb_codec">>,<<"0.0.5">>},0}, - {<<"antidotec_pb">>,{pkg,<<"antidotec_pb">>,<<"0.2.4">>},0}, + {<<"antidote_pb_codec">>,{pkg,<<"antidote_pb_codec">>,<<"0.1.0">>},0}, + {<<"antidotec_pb">>,{pkg,<<"antidotec_pb">>,<<"0.2.7">>},0}, {<<"basho_stats">>,{pkg,<<"basho_stats">>,<<"1.0.3">>},1}, {<<"bear">>,{pkg,<<"bear">>,<<"0.8.7">>},2}, {<<"blume">>,{pkg,<<"blume">>,<<"0.1.1">>},1}, @@ -21,7 +21,10 @@ {<<"gen_fsm_compat">>,{pkg,<<"gen_fsm_compat">>,<<"0.3.0">>},1}, {<<"getopt">>,{pkg,<<"getopt">>,<<"1.0.1">>},2}, {<<"goldrush">>,{pkg,<<"goldrush">>,<<"0.1.9">>},1}, - {<<"lager">>,{pkg,<<"lager">>,<<"3.7.0">>},0}, + {<<"lager">>, + {git,"https://github.com/erlang-lager/lager", + {ref,"411edc71ee9823f6ab9c6f617daccce3c6798a29"}}, + 0}, {<<"parse_trans">>,{pkg,<<"parse_trans">>,<<"3.3.0">>},2}, {<<"pbkdf2">>,{pkg,<<"pbkdf2">>,<<"2.0.0">>},1}, {<<"poolboy">>,{pkg,<<"basho_poolboy">>,<<"0.8.4">>},1}, @@ -40,8 +43,8 @@ {pkg_hash,[ {<<"accept">>, <<"2505B60BCB992CA79BD03AB7B8FEC8A520A47D9730F286DF1A479CC98B03F94B">>}, {<<"antidote_crdt">>, <<"A92A5ED8918D87AD22557825743C6EAC69DD6089D536E1BF5F9AC80992FA97F8">>}, - {<<"antidote_pb_codec">>, <<"139F291D7E4971DE3920E51D09BD10931AF957656ED6F7D9935B3EA059D88167">>}, - {<<"antidotec_pb">>, <<"9B6A760D75AFF0BCFC6B136DE6E4065B4FB4AED5D6AF18246FDC5B8F7F64C12F">>}, + {<<"antidote_pb_codec">>, <<"D8CC2D69BD25B3961ADECE20954FCB66719AA1DF1D027CF58389C8B7B6EFC739">>}, + {<<"antidotec_pb">>, <<"4CB55E6EB47806B07BBC81BF4727F9CBF6DF2B001FB43B31BFAB01A8047E6CF7">>}, {<<"basho_stats">>, <<"7E1174151509C64FCC1934120ED32295E14F84DAAE7F84926BA2C8D3700D146C">>}, {<<"bear">>, <<"16264309AE5D005D03718A5C82641FCC259C9E8F09ADEB6FD79CA4271168656F">>}, {<<"blume">>, <<"CFB4F43688690BA81C6A79F54E4678CFD5FDEDAB692F277AE740AE4A3897360D">>}, @@ -56,7 +59,6 @@ {<<"gen_fsm_compat">>, <<"5903549F67D595F58A7101154CBE0FDD46955FBFBE40813F1E53C23A970FF5F4">>}, {<<"getopt">>, <<"C73A9FA687B217F2FF79F68A3B637711BB1936E712B521D8CE466B29CBF7808A">>}, {<<"goldrush">>, <<"F06E5D5F1277DA5C413E84D5A2924174182FB108DABB39D5EC548B27424CD106">>}, - {<<"lager">>, <<"563AB17CD32134A3DD17EC3B3622E6D8F827506AA4F8C489158879BED87D980B">>}, {<<"parse_trans">>, <<"09765507A3C7590A784615CFD421D101AEC25098D50B89D7AA1D66646BC571C1">>}, {<<"pbkdf2">>, <<"11C23279FDED5C0027AB3996CFAE77805521D7EF4BABDE2BD7EC04A9086CF499">>}, {<<"poolboy">>, <<"45C306FF1C9F6451730DD21642EDF55FA72EBD5E2FE4A38D8D8A56B8EA21A256">>}, diff --git a/src/antidote_dc_manager.erl b/src/antidote_dc_manager.erl index 8bfb46764..043c25022 100644 --- a/src/antidote_dc_manager.erl +++ b/src/antidote_dc_manager.erl @@ -77,11 +77,11 @@ create_dc(Nodes) -> wait_until_ring_converged(Nodes), ok = wait_until(hd(Nodes), fun wait_init:check_ready/1), %% starts metadata services needed for intra-dc communication - ok = inter_dc_manager:start_bg_processes(stable), + ok = inter_dc_manager:start_bg_processes(stable_time_functions), ok. %% Start receiving updates from other DCs --spec subscribe_updates_from([#descriptor{}]) -> ok. +-spec subscribe_updates_from([descriptor()]) -> ok. subscribe_updates_from(DCDescriptors) -> _Connected = inter_dc_manager:observe_dcs_sync(DCDescriptors), %%TODO Check return for errors @@ -89,7 +89,7 @@ subscribe_updates_from(DCDescriptors) -> ok. %% Get the DC connection descriptor to be given to other DCs --spec get_connection_descriptor() -> {ok, #descriptor{}}. +-spec get_connection_descriptor() -> {ok, descriptor()}. get_connection_descriptor() -> inter_dc_manager:get_descriptor(). diff --git a/src/antidote_pb_process.erl b/src/antidote_pb_process.erl index b59d3af61..946926890 100644 --- a/src/antidote_pb_process.erl +++ b/src/antidote_pb_process.erl @@ -37,24 +37,27 @@ -export([process/1]). --spec decode_clock(binary()) -> snapshot_time() | ignore. -decode_clock(Clock) -> +-spec from_bin(binary()) -> snapshot_time() | ignore | txid(). +from_bin(Clock) -> case Clock of undefined -> ignore; _ -> binary_to_term(Clock) end. +-spec encode_clock(snapshot_time() | txid()) -> binary(). +encode_clock(TxId) -> + term_to_binary(TxId). --spec process(antidote_pb_codec:request()) -> antidote_pb_codec:response(). -process({start_transaction, {Clock, Properties}}) -> - Response = antidote:start_transaction(decode_clock(Clock), Properties), +-spec process(antidote_pb_codec:request()) -> antidote_pb_codec:response_in(). +process({start_transaction, Clock, Properties}) -> + Response = antidote:start_transaction(from_bin(Clock), Properties), case Response of - {ok, TxId} -> {start_transaction_response, {ok, TxId}}; + {ok, TxId} -> {start_transaction_response, {ok, encode_clock(TxId)}}; {error, Reason} -> {start_transaction_response, {error, Reason}} end; process({abort_transaction, TxId}) -> - Response = antidote:abort_transaction(TxId), + Response = antidote:abort_transaction(from_bin(TxId)), case Response of ok -> {operation_response, ok}; {error, Reason} -> {operation_response, {error, Reason}} @@ -64,39 +67,39 @@ process({abort_transaction, TxId}) -> end; process({commit_transaction, TxId}) -> - Response = antidote:commit_transaction(TxId), + Response = antidote:commit_transaction(from_bin(TxId)), case Response of {error, Reason} -> {commit_response, {error, Reason}}; - {ok, CommitTime} -> {commit_response, {ok, CommitTime}} + {ok, CommitTime} -> {commit_response, {ok, encode_clock(CommitTime)}} end; -process({update_objects, {Updates, TxId}}) -> - Response = antidote:update_objects(Updates, TxId), +process({update_objects, Updates, TxId}) -> + Response = antidote:update_objects(Updates, from_bin(TxId)), case Response of {error, Reason} -> {operation_response, {error, Reason}}; ok -> {operation_response, ok} end; -process({static_update_objects, {Clock, Properties, Updates}}) -> - Response = antidote:update_objects(decode_clock(Clock), Properties, Updates), +process({static_update_objects, Clock, Properties, Updates}) -> + Response = antidote:update_objects(from_bin(Clock), Properties, Updates), case Response of {error, Reason} -> {commit_response, {error, Reason}}; - {ok, CommitTime} -> {commit_response, {ok, CommitTime}} + {ok, CommitTime} -> {commit_response, {ok, encode_clock(CommitTime)}} end; -process({read_objects, {Objects, TxId}}) -> - Response = antidote:read_objects(Objects, TxId), +process({read_objects, Objects, TxId}) -> + Response = antidote:read_objects(Objects, from_bin(TxId)), case Response of {error, Reason} -> {read_objects_response, {error, Reason}}; {ok, Results} -> {read_objects_response, {ok, lists:zip(Objects, Results)}} end; -process({static_read_objects, {Clock, Properties, Objects}}) -> - Response = antidote:read_objects(decode_clock(Clock), Properties, Objects), +process({static_read_objects, Clock, Properties, Objects}) -> + Response = antidote:read_objects(from_bin(Clock), Properties, Objects), case Response of - {error, Reason} ->{commit_response, {error, Reason}}; - {ok, Results, CommitTime} -> {static_read_objects_response, {ok, lists:zip(Objects, Results), CommitTime}} + {error, Reason} -> {error_response, {error, Reason}}; + {ok, Results, CommitTime} -> {static_read_objects_response, {lists:zip(Objects, Results), encode_clock(CommitTime)}} end; process({create_dc, NodeNames}) -> @@ -106,30 +109,33 @@ process({create_dc, NodeNames}) -> catch Error:Reason -> %% Some error, return unsuccess. TODO: correct error response logger:info("Create DC Failed ~p : ~p", [Error, Reason]), - {operation_response, {error, create_dc_failed}} + {operation_response, {error, unknown}} end; -process({get_connection_descriptor}) -> +process(get_connection_descriptor) -> try {ok, Descriptor} = antidote_dc_manager:get_connection_descriptor(), + logger:info("Conection Descriptor: ~p", [Descriptor]), {get_connection_descriptor_resp, {ok, term_to_binary(Descriptor)}} catch Error:Reason -> %% Some error, return unsuccess. TODO: correct error response - logger:info("Get Conection Descriptor ~p : ~p", [Error, Reason]), - {get_connection_descriptor_resp, {error, no_clue}} + logger:info("Failed Conection Descriptor ~p : ~p", [Error, Reason]), + {get_connection_descriptor_resp, {error, unknown}} end; -process({connect_to_dcs, Descriptors}) -> +process({connect_to_dcs, BinDescriptors}) -> try + Descriptors = [binary_to_term(D) || D <- BinDescriptors], + logger:info("Conection Descriptor: ~p", [Descriptors]), ok = antidote_dc_manager:subscribe_updates_from(Descriptors), {operation_response, ok} catch Error:Reason -> %% Some error, return unsuccess. TODO: correct error response logger:info("Connect to DCs Failed ~p : ~p", [Error, Reason]), - {operation_response, {error, connect_to_dcs_failed}} - end; + {operation_response, {error, unknown}} + end. -process(Message) -> - logger:error("Received unhandled message ~p~n", [Message]), - MessageStr = erlang:iolist_to_binary(io_lib:format("~p", [Message])), - {error_response, {unknown, <<"Unhandled message ", MessageStr/binary>>}}. +% process(Message) -> +% logger:error("Received unhandled message ~p~n", [Message]), +% MessageStr = erlang:iolist_to_binary(io_lib:format("~p", [Message])), +% {error_response, {unknown, <<"Unhandled message ", MessageStr/binary>>}}. diff --git a/src/antidote_pb_protocol.erl b/src/antidote_pb_protocol.erl index f1a49e6f2..fa4cb01ff 100644 --- a/src/antidote_pb_protocol.erl +++ b/src/antidote_pb_protocol.erl @@ -67,22 +67,19 @@ loop(Socket, Transport) -> % handles a single request -spec handle(_Socket, _Transport, binary()) -> ok. handle(Socket, Transport, Msg) -> - % A message consists of an 8 bit message code and the actual protocol buffer message: - <> = Msg, - DecodedMessage = antidote_pb_codec:decode_message(antidote_pb_codec:decode_msg(MsgCode, ProtoBufMsg)), + DecodedMessage = antidote_pb_codec:decode_request(Msg), try Response = antidote_pb_process:process(DecodedMessage), - PbResponse = antidote_pb_codec:encode_message(Response), - PbMessage = antidote_pb_codec:encode_msg(PbResponse), + PbMessage = antidote_pb_codec:encode_response(Response), ok = Transport:send(Socket, PbMessage) catch - ExceptionType:Error -> + ExceptionType:Error:StackTrace -> % log errors and reply with error message: - logger:error("Error ~p: ~p~nWhen handling request ~p~n", [ExceptionType, Error, DecodedMessage]), + logger:error("Error ~p: ~p~n~p~bWhen handling request ~p~n", [ExceptionType, Error, StackTrace, DecodedMessage]), % when formatting the error message, we use a maximum depth of 9001. % This should be big enough to include useful information, but avoids sending a lot of data - MessageStr = erlang:iolist_to_binary(io_lib:format("~P: ~P~n", [ExceptionType, 9001, Error, 9001])), - Message = antidote_pb_codec:encode_msg(antidote_pb_codec:encode_message({error_response, {unknown, MessageStr}})), + MessageStr = erlang:iolist_to_binary(io_lib:format("~P: ~P~n~P~n", [ExceptionType, 9001, Error, 9001, StackTrace, 9001])), + Message = antidote_pb_codec:encode_response({error_response, {unknown, MessageStr}}), ok = Transport:send(Socket, Message), ok end. diff --git a/src/clocksi_interactive_coord.erl b/src/clocksi_interactive_coord.erl index f190fe891..7e4590952 100644 --- a/src/clocksi_interactive_coord.erl +++ b/src/clocksi_interactive_coord.erl @@ -253,7 +253,7 @@ finish_op(From, Key, Result) -> %% state: state of the transaction: {active|prepared|committing|committed} %%---------------------------------------------------------------------- --record(coord_state, { +-record(state, { from :: undefined | {pid(), term()} | pid(), transaction :: undefined | tx(), updated_partitions :: list(), @@ -274,6 +274,7 @@ finish_op(From, Key, Result) -> stay_alive :: boolean() }). +-type state() :: #state{}. %%%=================================================================== %%% States %%%=================================================================== @@ -290,7 +291,7 @@ init([From, ClientClock, Properties, StayAlive]) -> init([From, ClientClock, Properties, StayAlive, Operations]) -> BaseState = init_state(StayAlive, true, true, Properties), State = start_tx_internal(From, ClientClock, Properties, BaseState), - {ok, execute_op, State#coord_state{operations = Operations, from = From}, [{state_timeout, 0, timeout}]}. + {ok, execute_op, State#state{operations = Operations, from = From}, [{state_timeout, 0, timeout}]}. %%%== execute_op @@ -299,7 +300,7 @@ init([From, ClientClock, Properties, StayAlive, Operations]) -> %% operation, wait for it to finish (synchronous) and go to the prepareOP %% to execute the next operation. %% internal state timeout -execute_op(state_timeout, timeout, State = #coord_state{operations = Operations, from = From}) -> +execute_op(state_timeout, timeout, State = #state{operations = Operations, from = From}) -> execute_op({call, From}, Operations, State); %% update kept for backwards compatibility with tests. @@ -340,19 +341,19 @@ execute_op({call, Sender}, {OpType, Args}, State) -> %% updated partitions, and go to the "receive_committed" state. %% This state expects other process to sen the commit message to %% start the commit phase. -committing_2pc({call, Sender}, commit, State = #coord_state{transaction = Transaction, +committing_2pc({call, Sender}, commit, State = #state{transaction = Transaction, updated_partitions = UpdatedPartitions, commit_time = CommitTime}) -> NumToAck = length(UpdatedPartitions), case NumToAck of 0 -> - case reply_to_client(State#coord_state{state = committed_read_only, from = Sender}) of + case reply_to_client(State#state{state = committed_read_only, from = Sender}) of {start_tx, Data} -> {next_state, start_tx, Data}; {stop, normal, Data} -> {stop, normal, Data} end; _ -> ok = ?CLOCKSI_VNODE:commit(UpdatedPartitions, Transaction, CommitTime), - {next_state, receive_committed, State#coord_state{num_to_ack = NumToAck, from = Sender, state = committing}} + {next_state, receive_committed, State#state{num_to_ack = NumToAck, from = Sender, state = committing}} end. %%%== receive_prepared @@ -396,7 +397,7 @@ start_tx(cast, {start_tx, From, ClientClock, Properties}, State) -> %% Used by static update and read transactions start_tx(cast, {start_tx, From, ClientClock, Properties, Operation}, State) -> {next_state, execute_op, start_tx_internal(From, ClientClock, Properties, - State#coord_state{is_static = true, operations = Operation, from = From}), [{state_timeout, 0, timeout}]}; + State#state{is_static = true, operations = Operation, from = From}), [{state_timeout, 0, timeout}]}; %% capture regular events (e.g. logging_vnode responses) start_tx(info, {_EventType, EventValue}, State) -> @@ -409,32 +410,32 @@ start_tx(info, {_EventType, EventValue}, State) -> %% updated partitions, and go to the "receive_committed" state. %% This state is used when no commit message from the client is %% expected -committing({call, Sender}, commit, State = #coord_state{transaction = Transaction, +committing({call, Sender}, commit, State = #state{transaction = Transaction, updated_partitions = UpdatedPartitions, commit_time = Commit_time}) -> NumToAck = length(UpdatedPartitions), case NumToAck of 0 -> - case reply_to_client(State#coord_state{state = committed_read_only, from = Sender}) of + case reply_to_client(State#state{state = committed_read_only, from = Sender}) of {start_tx, Data} -> {next_state, start_tx, Data}; {stop, normal, Data} -> {stop, normal, Data} end; _ -> ok = ?CLOCKSI_VNODE:commit(UpdatedPartitions, Transaction, Commit_time), {next_state, receive_committed, - State#coord_state{num_to_ack = NumToAck, from = Sender, state = committing}} + State#state{num_to_ack = NumToAck, from = Sender, state = committing}} end. %%%== single_committing %% @doc TODO -single_committing(cast, {committed, CommitTime}, State = #coord_state{from = From, full_commit = FullCommit}) -> +single_committing(cast, {committed, CommitTime}, State = #state{from = From, full_commit = FullCommit}) -> case FullCommit of false -> - {next_state, committing_single, State#coord_state{commit_time = CommitTime, state = committing}, + {next_state, committing_single, State#state{commit_time = CommitTime, state = committing}, [{reply, From, {ok, CommitTime}}]}; true -> - case reply_to_client(State#coord_state{prepare_time = CommitTime, commit_time = CommitTime, state = committed}) of + case reply_to_client(State#state{prepare_time = CommitTime, commit_time = CommitTime, state = committed}) of {start_tx, Data} -> {next_state, start_tx, Data}; {stop, normal, Data} -> {stop, normal, Data} end @@ -462,15 +463,15 @@ single_committing(info, {_EventType, EventValue}, State) -> %% Should we retry sending the aborted message if we don't receive a %% reply from every partition? %% What delivery guarantees does sending messages provide? -receive_aborted(cast, ack_abort, State = #coord_state{num_to_ack = NumToAck}) -> +receive_aborted(cast, ack_abort, State = #state{num_to_ack = NumToAck}) -> case NumToAck of 1 -> - case reply_to_client(State#coord_state{state = aborted}) of + case reply_to_client(State#state{state = aborted}) of {start_tx, Data} -> {next_state, start_tx, Data}; {stop, normal, Data} -> {stop, normal, Data} end; _ -> - {next_state, receive_aborted, State#coord_state{num_to_ack = NumToAck - 1}} + {next_state, receive_aborted, State#state{num_to_ack = NumToAck - 1}} end; receive_aborted(cast, _, State) -> {next_state, receive_aborted, State}; @@ -483,7 +484,7 @@ receive_aborted(info, {_EventType, EventValue}, State) -> %%%== receive_read_objects_result %% @doc After asynchronously reading a batch of keys, collect the responses here -receive_read_objects_result(cast, {ok, {Key, Type, Snapshot}}, CoordState = #coord_state{ +receive_read_objects_result(cast, {ok, {Key, Type, Snapshot}}, CoordState = #state{ num_to_read = NumToRead, return_accumulator = ReadKeys }) -> @@ -496,14 +497,14 @@ receive_read_objects_result(cast, {ok, {Key, Type, Snapshot}}, CoordState = #coo %% Loop back to the same state until we process all the replies case NumToRead > 1 of true -> - {next_state, receive_read_objects_result, CoordState#coord_state{ + {next_state, receive_read_objects_result, CoordState#state{ num_to_read = NumToRead - 1, return_accumulator = ReadValues }}; false -> - {next_state, execute_op, CoordState#coord_state{num_to_read = 0}, - [{reply, CoordState#coord_state.from, {ok, lists:reverse(ReadValues)}}]} + {next_state, execute_op, CoordState#state{num_to_read = 0}, + [{reply, CoordState#state.from, {ok, lists:reverse(ReadValues)}}]} end; %% capture regular events (e.g. logging_vnode responses) @@ -521,7 +522,7 @@ receive_logging_responses(state_timeout, timeout, State) -> %% sends a log operation per update, to the vnode responsible of the updated %% key. After sending all those messages, the coordinator reaches this state %% to receive the responses of the vnodes. -receive_logging_responses(cast, Response, State = #coord_state{ +receive_logging_responses(cast, Response, State = #state{ is_static = IsStatic, num_to_read = NumToReply, return_accumulator = ReturnAcc @@ -536,7 +537,7 @@ receive_logging_responses(cast, Response, State = #coord_state{ %% Loop back to the same state until we process all the replies case NumToReply > 1 of true -> - {next_state, receive_logging_responses, State#coord_state{ + {next_state, receive_logging_responses, State#state{ num_to_read=NumToReply - 1, return_accumulator=NewAcc }}; @@ -559,8 +560,8 @@ receive_logging_responses(cast, Response, State = #coord_state{ {stop, normal, Data} end; false -> - {next_state, execute_op, State#coord_state{num_to_read=0, return_accumulator=[]}, - [{reply, State#coord_state.from, NewAcc}]} + {next_state, execute_op, State#state{num_to_read=0, return_accumulator=[]}, + [{reply, State#state.from, NewAcc}]} end; _ -> @@ -584,15 +585,15 @@ receive_logging_responses(info, {_EventType, EventValue}, State) -> %% Should we retry sending the committed message if we don't receive a %% reply from every partition? %% What delivery guarantees does sending messages provide? -receive_committed(cast, committed, State = #coord_state{num_to_ack = NumToAck}) -> +receive_committed(cast, committed, State = #state{num_to_ack = NumToAck}) -> case NumToAck of 1 -> - case reply_to_client(State#coord_state{state = committed}) of + case reply_to_client(State#state{state = committed}) of {start_tx, Data} -> {next_state, start_tx, Data}; {stop, normal, Data} -> {stop, normal, Data} end; _ -> - {next_state, receive_committed, State#coord_state{num_to_ack = NumToAck - 1}} + {next_state, receive_committed, State#state{num_to_ack = NumToAck - 1}} end; %% capture regular events (e.g. logging_vnode responses) @@ -605,8 +606,8 @@ receive_committed(info, {_EventType, EventValue}, State) -> %% @doc There was only a single partition with an update in this transaction %% so the transaction has already been committed %% so just wait for the commit message from the client -committing_single({call, Sender}, commit, State = #coord_state{commit_time = Commit_time}) -> - case reply_to_client(State#coord_state{ +committing_single({call, Sender}, commit, State = #state{commit_time = Commit_time}) -> + case reply_to_client(State#state{ prepare_time = Commit_time, from = Sender, commit_time = Commit_time, @@ -639,7 +640,7 @@ callback_mode() -> state_functions. %% @doc TODO init_state(StayAlive, FullCommit, IsStatic, Properties) -> - #coord_state{ + #state{ transaction = undefined, updated_partitions = [], client_ops = [], @@ -657,7 +658,7 @@ init_state(StayAlive, FullCommit, IsStatic, Properties) -> %% @doc TODO -start_tx_internal(From, ClientClock, Properties, State = #coord_state{stay_alive = StayAlive, is_static = IsStatic}) -> +start_tx_internal(From, ClientClock, Properties, State = #state{stay_alive = StayAlive, is_static = IsStatic}) -> {Transaction, TransactionId} = create_transaction_record(ClientClock, StayAlive, From, false, Properties), _ = case IsStatic of true -> ok; @@ -665,7 +666,7 @@ start_tx_internal(From, ClientClock, Properties, State = #coord_state{stay_alive end, % a new transaction was started, increment metrics ?PROMETHEUS_GAUGE:inc(antidote_open_transactions), - State#coord_state{transaction = Transaction, num_to_read = 0, properties = Properties}. + State#state{transaction = Transaction, num_to_read = 0, properties = Properties}. %% @doc TODO @@ -704,7 +705,7 @@ create_transaction_record(ClientClock, StayAlive, From, _IsStatic, Properties) - %% @doc Execute the commit protocol execute_command(prepare, Protocol, Sender, State0) -> - State = State0#coord_state{from=Sender, commit_protocol=Protocol}, + State = State0#state{from=Sender, commit_protocol=Protocol}, case Protocol of two_phase -> prepare_2pc(State); @@ -714,10 +715,10 @@ execute_command(prepare, Protocol, Sender, State0) -> %% @doc Abort the current transaction execute_command(abort, _Protocol, Sender, State) -> - abort(State#coord_state{from=Sender}); + abort(State#state{from=Sender}); %% @doc Perform a single read, synchronous -execute_command(read, {Key, Type}, Sender, State = #coord_state{ +execute_command(read, {Key, Type}, Sender, State = #state{ transaction=Transaction, updated_partitions=UpdatedPartitions }) -> @@ -729,36 +730,36 @@ execute_command(read, {Key, Type}, Sender, State = #coord_state{ end; %% @doc Read a batch of objects, asynchronous -execute_command(read_objects, Objects, Sender, State = #coord_state{transaction=Transaction}) -> +execute_command(read_objects, Objects, Sender, State = #state{transaction=Transaction}) -> ExecuteReads = fun({Key, Type}, AccState) -> ?PROMETHEUS_COUNTER:inc(antidote_operations_total, [read_async]), Partition = ?LOG_UTIL:get_key_partition(Key), ok = clocksi_vnode:async_read_data_item(Partition, Transaction, Key, Type), - ReadKeys = AccState#coord_state.return_accumulator, - AccState#coord_state{return_accumulator=[Key | ReadKeys]} + ReadKeys = AccState#state.return_accumulator, + AccState#state{return_accumulator=[Key | ReadKeys]} end, NewCoordState = lists:foldl( ExecuteReads, - State#coord_state{num_to_read = length(Objects), return_accumulator=[]}, + State#state{num_to_read = length(Objects), return_accumulator=[]}, Objects ), - {receive_read_objects_result, NewCoordState#coord_state{from=Sender}}; + {receive_read_objects_result, NewCoordState#state{from=Sender}}; %% @doc Perform update operations on a batch of Objects -execute_command(update_objects, UpdateOps, Sender, State = #coord_state{transaction=Transaction}) -> - ExecuteUpdates = fun(Op, AccState=#coord_state{ +execute_command(update_objects, UpdateOps, Sender, State = #state{transaction=Transaction}) -> + ExecuteUpdates = fun(Op, AccState=#state{ client_ops = ClientOps0, updated_partitions = UpdatedPartitions0 }) -> case perform_update(Op, UpdatedPartitions0, Transaction, Sender, ClientOps0) of {error, _} = Err -> - AccState#coord_state{return_accumulator = Err}; + AccState#state{return_accumulator = Err}; {UpdatedPartitions, ClientOps} -> - NumToRead = AccState#coord_state.num_to_read, - AccState#coord_state{ + NumToRead = AccState#state.num_to_read, + AccState#state{ client_ops=ClientOps, num_to_read=NumToRead + 1, updated_partitions=UpdatedPartitions @@ -768,12 +769,12 @@ execute_command(update_objects, UpdateOps, Sender, State = #coord_state{transact NewCoordState = lists:foldl( ExecuteUpdates, - State#coord_state{num_to_read=0, return_accumulator=ok}, + State#state{num_to_read=0, return_accumulator=ok}, UpdateOps ), - LoggingState = NewCoordState#coord_state{from=Sender}, - case LoggingState#coord_state.num_to_read > 0 of + LoggingState = NewCoordState#state{from=Sender}, + case LoggingState#state.num_to_read > 0 of true -> {receive_logging_responses, LoggingState}; false -> @@ -783,7 +784,7 @@ execute_command(update_objects, UpdateOps, Sender, State = #coord_state{transact %% @doc function called when 2pc is forced independently of the number of partitions %% involved in the txs. -prepare_2pc(State = #coord_state{ +prepare_2pc(State = #state{ transaction = Transaction, updated_partitions = UpdatedPartitions, full_commit = FullCommit, from = From}) -> case UpdatedPartitions of @@ -791,32 +792,33 @@ prepare_2pc(State = #coord_state{ SnapshotTime = Transaction#transaction.snapshot_time_local, case FullCommit of false -> - {committing_2pc, State#coord_state{state = committing, commit_time = SnapshotTime}, + {committing_2pc, State#state{state = committing, commit_time = SnapshotTime}, [{reply, From, {ok, SnapshotTime}}]}; true -> - reply_to_client(State#coord_state{state = committed_read_only}) + reply_to_client(State#state{state = committed_read_only}) end; [_|_] -> ok = ?CLOCKSI_VNODE:prepare(UpdatedPartitions, Transaction), Num_to_ack = length(UpdatedPartitions), {receive_prepared, - State#coord_state{num_to_ack = Num_to_ack, state = prepared}} + State#state{num_to_ack = Num_to_ack, state = prepared}} end. %% @doc when the transaction has committed or aborted, %% a reply is sent to the client that started the transaction. -reply_to_client(State = #coord_state{ - from=From, - state=TxState, - is_static=IsStatic, - stay_alive=StayAlive, - client_ops=ClientOps, - commit_time=CommitTime, - full_commit=FullCommit, - transaction=Transaction, - return_accumulator=ReturnAcc - }) -> + +reply_to_client(State = #state{ + from=From, + state=TxState, + is_static=IsStatic, + stay_alive=StayAlive, + client_ops=ClientOps, + commit_time=CommitTime, + full_commit=FullCommit, + transaction=Transaction, + return_accumulator=ReturnAcc +}) -> _ = case From of undefined -> ok; @@ -879,10 +881,10 @@ reply_to_client(State = #coord_state{ %% @doc The following function is used to apply the updates that were performed by the running %% transaction, to the result returned by a read. --spec apply_tx_updates_to_snapshot (key(), #coord_state{}, type(), snapshot()) -> snapshot(). +-spec apply_tx_updates_to_snapshot (key(), state(), type(), snapshot()) -> snapshot(). apply_tx_updates_to_snapshot(Key, CoordState, Type, Snapshot)-> Partition = ?LOG_UTIL:get_key_partition(Key), - Found = lists:keyfind(Partition, 1, CoordState#coord_state.updated_partitions), + Found = lists:keyfind(Partition, 1, CoordState#state.updated_partitions), case Found of false -> @@ -1041,7 +1043,7 @@ async_log_propagation(Partition, TxId, Key, Type, Record) -> %% @doc this function sends a prepare message to all updated partitions and goes %% to the "receive_prepared"state. -prepare(State = #coord_state{ +prepare(State = #state{ from=From, num_to_read=NumToRead, full_commit=FullCommit, @@ -1055,28 +1057,28 @@ prepare(State = #coord_state{ 0 -> case FullCommit of true -> - reply_to_client(State#coord_state{state = committed_read_only}); + reply_to_client(State#state{state = committed_read_only}); false -> - {committing, State#coord_state{state = committing, commit_time = SnapshotTimeLocal}, + {committing, State#state{state = committing, commit_time = SnapshotTimeLocal}, [{reply, From, {ok, SnapshotTimeLocal}}]} end; _ -> - {receive_prepared, State#coord_state{state = prepared}} + {receive_prepared, State#state{state = prepared}} end; [_] -> ok = ?CLOCKSI_VNODE:single_commit(UpdatedPartitions, Transaction), - {single_committing, State#coord_state{state = committing, num_to_ack = 1}}; + {single_committing, State#state{state = committing, num_to_ack = 1}}; [_|_] -> ok = ?CLOCKSI_VNODE:prepare(UpdatedPartitions, Transaction), Num_to_ack = length(UpdatedPartitions), - {receive_prepared, State#coord_state{num_to_ack = Num_to_ack, state = prepared}} + {receive_prepared, State#state{num_to_ack = Num_to_ack, state = prepared}} end. -process_prepared(ReceivedPrepareTime, State = #coord_state{num_to_ack = NumToAck, +process_prepared(ReceivedPrepareTime, State = #state{num_to_ack = NumToAck, commit_protocol = CommitProtocol, full_commit = FullCommit, from = From, prepare_time = PrepareTime, transaction = Transaction, @@ -1089,9 +1091,9 @@ process_prepared(ReceivedPrepareTime, State = #coord_state{num_to_ack = NumToAck true -> ok = ?CLOCKSI_VNODE:commit(UpdatedPartitions, Transaction, MaxPrepareTime), {receive_committed, - State#coord_state{num_to_ack = length(UpdatedPartitions), commit_time = MaxPrepareTime, state = committing}}; + State#state{num_to_ack = length(UpdatedPartitions), commit_time = MaxPrepareTime, state = committing}}; false -> - {committing_2pc, State#coord_state{ + {committing_2pc, State#state{ prepare_time = MaxPrepareTime, commit_time = MaxPrepareTime, state = committing @@ -1102,14 +1104,14 @@ process_prepared(ReceivedPrepareTime, State = #coord_state{num_to_ack = NumToAck true -> ok = ?CLOCKSI_VNODE:commit(UpdatedPartitions, Transaction, MaxPrepareTime), {receive_committed, - State#coord_state{ + State#state{ num_to_ack = length(UpdatedPartitions), commit_time = MaxPrepareTime, state = committing } }; false -> - {committing, State#coord_state{ + {committing, State#state{ prepare_time = MaxPrepareTime, commit_time = MaxPrepareTime, state = committing @@ -1117,21 +1119,21 @@ process_prepared(ReceivedPrepareTime, State = #coord_state{num_to_ack = NumToAck end end; _ -> - {receive_prepared, State#coord_state{num_to_ack = NumToAck - 1, prepare_time = MaxPrepareTime}} + {receive_prepared, State#state{num_to_ack = NumToAck - 1, prepare_time = MaxPrepareTime}} end. %% @doc when an error occurs or an updated partition %% does not pass the certification check, the transaction aborts. -abort(State = #coord_state{transaction = Transaction, +abort(State = #state{transaction = Transaction, updated_partitions = UpdatedPartitions}) -> NumToAck = length(UpdatedPartitions), case NumToAck of 0 -> - reply_to_client(State#coord_state{state = aborted}); + reply_to_client(State#state{state = aborted}); _ -> ok = ?CLOCKSI_VNODE:abort(UpdatedPartitions, Transaction), - {receive_aborted, State#coord_state{num_to_ack = NumToAck, state = aborted}} + {receive_aborted, State#state{num_to_ack = NumToAck, state = aborted}} end. diff --git a/src/clocksi_materializer.erl b/src/clocksi_materializer.erl index a9c4b06d1..96758af55 100644 --- a/src/clocksi_materializer.erl +++ b/src/clocksi_materializer.erl @@ -82,7 +82,7 @@ get_first_id(Tuple) when is_tuple(Tuple) -> -spec materialize(type(), txid() | ignore, snapshot_time() | ignore, - #snapshot_get_response{} + snapshot_get_response() ) -> {ok, snapshot(), integer(), snapshot_time() | ignore, boolean(), non_neg_integer()} | {error, reason()}. diff --git a/src/clocksi_readitem_server.erl b/src/clocksi_readitem_server.erl index 22d4dd43c..89545f51f 100644 --- a/src/clocksi_readitem_server.erl +++ b/src/clocksi_readitem_server.erl @@ -63,7 +63,7 @@ id :: non_neg_integer(), prepared_cache :: cache_id(), self :: atom()}). - +-type state() :: #state{}. -type read_property_list() :: []. -export_type([read_property_list/0]). %%%=================================================================== @@ -212,7 +212,7 @@ handle_cast({perform_read_cast, Coordinator, Key, Type, Transaction, PropertyLis ok = perform_read_internal(Coordinator, Key, Type, Transaction, PropertyList, SD0), {noreply, SD0}. --spec perform_read_internal(pid(), key(), type(), #transaction{}, read_property_list(), #state{}) -> +-spec perform_read_internal(pid(), key(), type(), tx(), read_property_list(), state()) -> ok. perform_read_internal(Coordinator, Key, Type, Transaction, PropertyList, _SD0 = #state{prepared_cache = PreparedCache, partition = Partition}) -> @@ -265,7 +265,7 @@ check_prepared_list(Key, TxLocalStartTime, [{_TxId, Time}|Rest]) -> %% @doc return: %% - Reads and returns the log of specified Key using replication layer. --spec return({fsm, pid()} | {pid(), term()}, key(), type(), #transaction{}, read_property_list(), partition_id()) -> ok. +-spec return({fsm, pid()} | {pid(), term()}, key(), type(), tx(), read_property_list(), partition_id()) -> ok. return(Coordinator, Key, Type, Transaction, PropertyList, Partition) -> VecSnapshotTime = Transaction#transaction.vec_snapshot_time, TxId = Transaction#transaction.txn_id, diff --git a/src/dc_meta_data_utilities.erl b/src/dc_meta_data_utilities.erl index 2aac66daf..ce42f0c25 100644 --- a/src/dc_meta_data_utilities.erl +++ b/src/dc_meta_data_utilities.erl @@ -187,7 +187,7 @@ get_partition_at_index(Index) -> end. %% Store an external dc descriptor --spec store_dc_descriptors([#descriptor{}]) -> ok. +-spec store_dc_descriptors([descriptor()]) -> ok. store_dc_descriptors(Descriptors) -> MergeFunc = fun(DescList, PrevDict) -> lists:foldl(fun(Desc = #descriptor{dcid = DCID}, Acc) -> @@ -197,7 +197,7 @@ store_dc_descriptors(Descriptors) -> stable_meta_data_server:broadcast_meta_data_merge(external_descriptors, Descriptors, MergeFunc, fun dict:new/0). %% Gets the list of external dc descriptors --spec get_dc_descriptors() -> [#descriptor{}]. +-spec get_dc_descriptors() -> [descriptor()]. get_dc_descriptors() -> case stable_meta_data_server:read_meta_data(external_descriptors) of {ok, Dict} -> diff --git a/src/inter_dc_dep_vnode.erl b/src/inter_dc_dep_vnode.erl index 5f40bc778..f831aa96f 100644 --- a/src/inter_dc_dep_vnode.erl +++ b/src/inter_dc_dep_vnode.erl @@ -69,12 +69,12 @@ last_updated :: non_neg_integer(), drop_ping :: boolean() }). - +-type state() :: #state{}. %%%% API --------------------------------------------------------------------+ %% Passes the received transaction to the dependency buffer. %% At this point no message can be lost (the transport layer must ensure all transactions are delivered reliably). --spec handle_transaction(#interdc_txn{}) -> ok. +-spec handle_transaction(interdc_txn()) -> ok. handle_transaction(Txn=#interdc_txn{partition = P}) -> dc_utilities:call_local_vnode_sync(P, inter_dc_dep_vnode_master, {txn, Txn}). %% After restarting from failure, load the vectorclock of the max times of all the updates received from other DCs @@ -84,7 +84,7 @@ set_dependency_clock(Partition, Vector) -> dc_utilities:call_local_vnode_sync(Pa %%%% VNode methods ----------------------------------------------------------+ --spec init([partition_id()]) -> {ok, #state{}}. +-spec init([partition_id()]) -> {ok, state()}. init([Partition]) -> StableSnapshot = vectorclock:new(), {ok, #state{partition = Partition, queues = dict:new(), vectorclock = StableSnapshot, last_updated = 0, drop_ping = false}}. @@ -93,7 +93,7 @@ start_vnode(I) -> riak_core_vnode_master:get_vnode_pid(I, ?MODULE). %% Check the content of each queue, try to apply as many elements as possible. %% If any element was successfully pushed from any queue, repeat the process. --spec process_all_queues(#state{}) -> #state{}. +-spec process_all_queues(state()) -> state(). process_all_queues(State = #state{queues = Queues}) -> DCIDs = dict:fetch_keys(Queues), {NewState, NumUpdated} = lists:foldl(fun process_queue/2, {State, 0}, DCIDs), @@ -118,7 +118,7 @@ process_queue(DCID, {State, Acc}) -> %% Store the heartbeat message. %% This is not a true transaction, so its dependencies are always satisfied. --spec try_store(#state{}, #interdc_txn{}) -> {#state{}, boolean()}. +-spec try_store(state(), interdc_txn()) -> {state(), boolean()}. try_store(State=#state{drop_ping = true}, #interdc_txn{log_records = []}) -> {State, true}; try_store(State, #interdc_txn{dcid = DCID, timestamp = Timestamp, log_records = []}) -> @@ -185,7 +185,7 @@ handle_overload_info(_, _) -> %%%% Utilities --------------------------------------------------------------+ %% Push the transaction to an appropriate queue inside the state. --spec push_txn(#state{}, #interdc_txn{}) -> #state{}. +-spec push_txn(state(), interdc_txn()) -> state(). push_txn(State = #state{queues = Queues}, Txn = #interdc_txn{dcid = DCID}) -> DCID = Txn#interdc_txn.dcid, Queue = case dict:find(DCID, Queues) of @@ -202,7 +202,7 @@ pop_txn(State = #state{queues = Queues}, DCID) -> State#state{queues = dict:store(DCID, NewQueue, Queues)}. %% Update the clock value associated with the given DCID from the perspective of this partition. --spec update_clock(#state{}, dcid(), non_neg_integer()) -> #state{}. +-spec update_clock(state(), dcid(), non_neg_integer()) -> state(). update_clock(State = #state{last_updated = LastUpdated}, DCID, Timestamp) -> %% Should we decrement the timestamp value by 1? NewClock = vectorclock:set(DCID, Timestamp, State#state.vectorclock), @@ -232,13 +232,13 @@ update_clock(State = #state{last_updated = LastUpdated}, DCID, Timestamp) -> State#state{vectorclock = NewClock, last_updated = NewLastUpdated}. %% Get the current vectorclock from the perspective of this partition, with the updated entry for current DC. --spec get_partition_clock(#state{}) -> vectorclock(). +-spec get_partition_clock(state()) -> vectorclock(). get_partition_clock(State) -> %% Return the vectorclock associated with the current state, but update the local entry with the current timestamp vectorclock:set(dc_meta_data_utilities:get_my_dc_id(), dc_utilities:now_microsec(), State#state.vectorclock). %% Utility function: converts the transaction to a list of clocksi_payload ops. --spec updates_to_clocksi_payloads(#interdc_txn{}) -> list(#clocksi_payload{}). +-spec updates_to_clocksi_payloads(interdc_txn()) -> list(clocksi_payload()). updates_to_clocksi_payloads(Txn = #interdc_txn{dcid = DCID, timestamp = CommitTime, snapshot = SnapshotTime}) -> lists:map(fun(#log_record{log_operation = LogRecord}) -> #update_log_payload{key = Key, type = Type, op = Op} = LogRecord#log_operation.log_payload, diff --git a/src/inter_dc_log_sender_vnode.erl b/src/inter_dc_log_sender_vnode.erl index bbba93969..1937cac62 100644 --- a/src/inter_dc_log_sender_vnode.erl +++ b/src/inter_dc_log_sender_vnode.erl @@ -67,17 +67,17 @@ -record(state, { partition :: partition_id(), buffer, %% log_tx_assembler:state - last_log_id :: #op_number{}, + last_log_id :: op_number(), timer :: any() }). - +-type state() :: #state{}. %%%% API --------------------------------------------------------------------+ %% Send the new operation to the log_sender. %% The transaction will be buffered until all the operations in a transaction are collected, %% and then the transaction will be broadcasted via interDC. %% WARNING: only LOCALLY COMMITED operations (not from remote DCs) should be sent to log_sender_vnode. --spec send(partition_id(), #log_record{}) -> ok. +-spec send(partition_id(), log_record()) -> ok. send(Partition, LogRecord) -> dc_utilities:call_vnode(Partition, inter_dc_log_sender_vnode_master, {log_event, LogRecord}). %% Start the heartbeat timer @@ -86,7 +86,7 @@ start_timer(Partition) -> dc_utilities:call_vnode_sync(Partition, inter_dc_log_s %% After restarting from failure, load the operation id of the last operation sent by this DC %% Otherwise the stable time won't advance as the receiving DC will be thinking it is getting old messages --spec update_last_log_id(partition_id(), #op_number{}) -> ok. +-spec update_last_log_id(partition_id(), op_number()) -> ok. update_last_log_id(Partition, OpId) -> dc_utilities:call_vnode_sync(Partition, inter_dc_log_sender_vnode_master, {update_last_log_id, OpId}). %% Send the stable time to this vnode, no transaction in the future will commit with a smaller time @@ -115,7 +115,7 @@ handle_command({update_last_log_id, OpId}, _Sender, State = #state{partition = P {reply, ok, State#state{last_log_id = OpId}}; %% Handle the new operation -%% -spec handle_command({log_event, #log_record{}}, pid(), #state{}) -> {noreply, #state{}}. +%% -spec handle_command({log_event, log_record()}, pid(), state()) -> {noreply, state()}. handle_command({log_event, LogRecord}, _Sender, State) -> %% Use the txn_assembler to check if the complete transaction was collected. {Result, NewBufState} = log_txn_assembler:process(LogRecord, State#state.buffer), @@ -173,18 +173,18 @@ handle_overload_info(_, _) -> %%%%%%%%%%%%%%%%%%%%%%%% %% Cancels the ping timer, if one is set. --spec del_timer(#state{}) -> #state{}. +-spec del_timer(state()) -> state(). del_timer(State = #state{timer = none}) -> State; del_timer(State = #state{timer = Timer}) -> _ = erlang:cancel_timer(Timer), State#state{timer = none}. %% Cancels the previous ping timer and sets a new one. --spec set_timer(#state{}) -> #state{}. +-spec set_timer(state()) -> state(). set_timer(State) -> set_timer(false, State). --spec set_timer(boolean(), #state{}) -> #state{}. +-spec set_timer(boolean(), state()) -> state(). set_timer(First, State = #state{partition = Partition}) -> case First of true -> @@ -205,7 +205,7 @@ set_timer(First, State = #state{partition = Partition}) -> %% Broadcasts the transaction via local publisher. --spec broadcast(#state{}, #interdc_txn{}) -> #state{}. +-spec broadcast(state(), interdc_txn()) -> state(). broadcast(State, Txn) -> inter_dc_pub:broadcast(Txn), Id = inter_dc_txn:last_log_opid(Txn), diff --git a/src/inter_dc_manager.erl b/src/inter_dc_manager.erl index 0c726cb9f..afc4b0667 100644 --- a/src/inter_dc_manager.erl +++ b/src/inter_dc_manager.erl @@ -46,7 +46,7 @@ forget_dcs/1, drop_ping/1]). --spec get_descriptor() -> {ok, #descriptor{}}. +-spec get_descriptor() -> {ok, descriptor()}. get_descriptor() -> %% Wait until all needed vnodes are spawned, so that the heartbeats are already being sent ok = dc_utilities:ensure_all_vnodes_running_master(inter_dc_log_sender_vnode_master), @@ -64,7 +64,7 @@ get_descriptor() -> %% When a connecting to a new DC, Nodes will be all the nodes in the local DC %% Otherwise this will be called with a single node that is reconnecting (for example after one of the nodes in the DC crashes and restarts) %% Note this is an internal function, to instruct the local DC to connect to a new DC the observe_dcs_sync(Descriptors) function should be used --spec observe_dc(#descriptor{}, [node()]) -> ok | inter_dc_conn_err(). +-spec observe_dc(descriptor(), [node()]) -> ok | inter_dc_conn_err(). observe_dc(Desc = #descriptor{dcid = DCID, partition_num = PartitionsNumRemote, publishers = Publishers, logreaders = LogReaders}, Nodes) -> PartitionsNumLocal = dc_utilities:get_partitions_num(), case PartitionsNumRemote == PartitionsNumLocal of @@ -84,7 +84,7 @@ observe_dc(Desc = #descriptor{dcid = DCID, partition_num = PartitionsNumRemote, end end. --spec connect_nodes([node()], dcid(), [socket_address()], [socket_address()], #descriptor{}, non_neg_integer()) -> +-spec connect_nodes([node()], dcid(), [socket_address()], [socket_address()], descriptor(), non_neg_integer()) -> ok | {error, connection_error}. connect_nodes([], _DCID, _LogReaders, _Publishers, _Desc, _Retries) -> ok; @@ -200,18 +200,18 @@ check_node_restart() -> false end. --spec reconnect_dcs_after_restart([#descriptor{}], node()) -> [ok | inter_dc_conn_err()]. +-spec reconnect_dcs_after_restart([descriptor()], node()) -> [ok | inter_dc_conn_err()]. reconnect_dcs_after_restart(Descriptors, MyNode) -> ok = forget_dcs(Descriptors, [MyNode]), observe_dcs_sync(Descriptors, [MyNode]). %% This should be called when connecting the local DC to a new external DC --spec observe_dcs_sync([#descriptor{}]) -> [ok | inter_dc_conn_err()]. +-spec observe_dcs_sync([descriptor()]) -> [ok | inter_dc_conn_err()]. observe_dcs_sync(Descriptors) -> Nodes = dc_utilities:get_my_dc_nodes(), observe_dcs_sync(Descriptors, Nodes). --spec observe_dcs_sync([#descriptor{}], [node()]) -> [ok | inter_dc_conn_err()]. +-spec observe_dcs_sync([descriptor()], [node()]) -> [ok | inter_dc_conn_err()]. observe_dcs_sync(Descriptors, Nodes) -> {ok, SS} = dc_utilities:get_stable_snapshot(), DCs = lists:map(fun(DC) -> @@ -229,7 +229,7 @@ observe_dcs_sync(Descriptors, Nodes) -> end, DCs), [Result1 || {Result1, _DC1} <- DCs]. --spec forget_dc(#descriptor{}, [node()]) -> ok. +-spec forget_dc(descriptor(), [node()]) -> ok. forget_dc(#descriptor{dcid = DCID}, Nodes) -> case DCID == dc_meta_data_utilities:get_my_dc_id() of true -> ok; @@ -239,12 +239,12 @@ forget_dc(#descriptor{dcid = DCID}, Nodes) -> lists:foreach(fun(Node) -> ok = rpc:call(Node, inter_dc_sub, del_dc, [DCID]) end, Nodes) end. --spec forget_dcs([#descriptor{}]) -> ok. +-spec forget_dcs([descriptor()]) -> ok. forget_dcs(Descriptors) -> Nodes = dc_utilities:get_my_dc_nodes(), forget_dcs(Descriptors, Nodes). --spec forget_dcs([#descriptor{}], [node()]) -> ok. +-spec forget_dcs([descriptor()], [node()]) -> ok. forget_dcs(Descriptors, Nodes) -> lists:foreach(fun(Descriptor) -> forget_dc(Descriptor, Nodes) end , Descriptors). diff --git a/src/inter_dc_pub.erl b/src/inter_dc_pub.erl index 9b8c295af..e9eb848e0 100644 --- a/src/inter_dc_pub.erl +++ b/src/inter_dc_pub.erl @@ -84,7 +84,7 @@ get_address_list() -> Port = application:get_env(antidote, pubsub_port, ?DEFAULT_PUBSUB_PORT), [{Ip1, Port} || Ip1 <- IpList, Ip1 /= {127, 0, 0, 1}]. --spec broadcast(#interdc_txn{}) -> ok. +-spec broadcast(interdc_txn()) -> ok. broadcast(Txn) -> case catch gen_server:call(?MODULE, {publish, inter_dc_txn:to_bin(Txn)}) of {'EXIT', _Reason} -> logger:warning("Failed to broadcast a transaction."); %% this can happen if a node is shutting down. diff --git a/src/inter_dc_query.erl b/src/inter_dc_query.erl index 4e122c72b..3e75c2364 100644 --- a/src/inter_dc_query.erl +++ b/src/inter_dc_query.erl @@ -73,7 +73,7 @@ %% the second is a #request_cache_entry{} record %% Note that the function should not perform anywork, instead just send %%% the work to another thread, otherwise it will block other messages --spec perform_request(inter_dc_message_type(), pdcid(), binary(), fun((binary(), #request_cache_entry{})->ok)) +-spec perform_request(inter_dc_message_type(), pdcid(), binary(), fun((binary(), request_cache_entry()) -> ok)) -> ok | unknown_dc. perform_request(RequestType, PDCID, BinaryRequest, Func) -> gen_server:call(?MODULE, {any_request, RequestType, PDCID, BinaryRequest, Func}). diff --git a/src/inter_dc_query_receive_socket.erl b/src/inter_dc_query_receive_socket.erl index 02471e472..a17d4bd2a 100644 --- a/src/inter_dc_query_receive_socket.erl +++ b/src/inter_dc_query_receive_socket.erl @@ -90,7 +90,7 @@ get_address_list() -> AddressList = [{Ip1, Port} || Ip1 <- IpList, Ip1 /= {127, 0, 0, 1}], {PartitionList, AddressList}. --spec send_response(binary(), #inter_dc_query_state{}) -> ok. +-spec send_response(binary(), inter_dc_query_state()) -> ok. send_response(BinaryResponse, QueryState = #inter_dc_query_state{local_pid=Sender}) -> ok = gen_server:cast(Sender, {send_response, BinaryResponse, QueryState}). diff --git a/src/inter_dc_query_response.erl b/src/inter_dc_query_response.erl index 578f80da3..ab5aa8dc3 100644 --- a/src/inter_dc_query_response.erl +++ b/src/inter_dc_query_response.erl @@ -54,11 +54,11 @@ start_link(Num) -> gen_server:start_link({local, generate_server_name(Num)}, ?MODULE, [Num], []). --spec get_entries(binary(), #inter_dc_query_state{}) -> ok. +-spec get_entries(binary(), inter_dc_query_state()) -> ok. get_entries(BinaryQuery, QueryState) -> ok = gen_server:cast(generate_server_name(rand_compat:uniform(?INTER_DC_QUERY_CONCURRENCY)), {get_entries, BinaryQuery, QueryState}). --spec request_permissions(binary(), #inter_dc_query_state{}) -> ok. +-spec request_permissions(binary(), inter_dc_query_state()) -> ok. request_permissions(BinaryRequest, QueryState) -> ok = gen_server:cast(generate_server_name(rand_compat:uniform(?INTER_DC_QUERY_CONCURRENCY)), {request_permissions, BinaryRequest, QueryState}). @@ -94,7 +94,7 @@ handle_call(_Info, _From, State) -> handle_info(_Info, State) -> {noreply, State}. --spec get_entries_internal(partition_id(), log_opid(), log_opid()) -> [#interdc_txn{}]. +-spec get_entries_internal(partition_id(), log_opid(), log_opid()) -> [interdc_txn()]. get_entries_internal(Partition, From, To) -> Node = case lists:member(Partition, dc_utilities:get_my_partitions()) of true -> node(); @@ -111,13 +111,13 @@ get_entries_internal(Partition, From, To) -> %% TODO: re-implement this method efficiently once the log provides efficient access by partition and DC (Santiago, here!) %% TODO: also fix the method to provide complete snapshots if the log was trimmed --spec log_read_range(partition_id(), node(), log_opid(), log_opid()) -> [#log_record{}]. +-spec log_read_range(partition_id(), node(), log_opid(), log_opid()) -> [log_record()]. log_read_range(Partition, Node, From, To) -> {ok, RawOpList} = logging_vnode:read({Partition, Node}, [Partition]), OpList = lists:map(fun({_Partition, Op}) -> Op end, RawOpList), filter_operations(OpList, From, To). --spec filter_operations([#log_record{}], log_opid(), log_opid()) -> [#log_record{}]. +-spec filter_operations([log_record()], log_opid(), log_opid()) -> [log_record()]. filter_operations(Ops, Min, Max) -> F = fun(Op) -> Num = Op#log_record.op_number#op_number.local, diff --git a/src/inter_dc_sub_buf.erl b/src/inter_dc_sub_buf.erl index 1dfca20c3..c5b27b341 100644 --- a/src/inter_dc_sub_buf.erl +++ b/src/inter_dc_sub_buf.erl @@ -43,7 +43,7 @@ %%%% API --------------------------------------------------------------------+ %% TODO: Fetch last observed ID from durable storage (maybe log?). This way, in case of a node crash, the queue can be fetched again. --spec new_state(pdcid()) -> #inter_dc_sub_buf{}. +-spec new_state(pdcid()) -> inter_dc_sub_buf(). new_state(PDCID) -> {ok, EnableLogging} = application:get_env(antidote, enable_logging), #inter_dc_sub_buf{ @@ -54,7 +54,7 @@ new_state(PDCID) -> logging_enabled = EnableLogging }. --spec process({txn, #interdc_txn{}} | {log_reader_resp, [#interdc_txn{}]}, #inter_dc_sub_buf{}) -> #inter_dc_sub_buf{}. +-spec process({txn, interdc_txn()} | {log_reader_resp, [interdc_txn()]}, inter_dc_sub_buf()) -> inter_dc_sub_buf(). process({txn, Txn}, State = #inter_dc_sub_buf{last_observed_opid = init, pdcid = {DCID, Partition}}) -> %% If this is the first txn received (i.e. if last_observed_opid = init) then check the log %% to see if there was a previous op received (i.e. in the case of fail and restart) so that @@ -141,12 +141,12 @@ process_queue(State = #inter_dc_sub_buf{queue = Queue, last_observed_opid = Last end end. --spec deliver(#interdc_txn{}) -> ok. +-spec deliver(interdc_txn()) -> ok. deliver(Txn) -> inter_dc_dep_vnode:handle_transaction(Txn). %% TODO: consider dropping messages if the queue grows too large. %% The lost messages would be then fetched again by the log_reader. --spec push(#interdc_txn{}, #inter_dc_sub_buf{}) -> #inter_dc_sub_buf{}. +-spec push(interdc_txn(), inter_dc_sub_buf()) -> inter_dc_sub_buf(). push(Txn, State) -> State#inter_dc_sub_buf{queue = queue:in(Txn, State#inter_dc_sub_buf.queue)}. %% Instructs the log reader to ask the remote DC for a given range of operations. diff --git a/src/inter_dc_sub_vnode.erl b/src/inter_dc_sub_vnode.erl index e2a51538e..e933af392 100644 --- a/src/inter_dc_sub_vnode.erl +++ b/src/inter_dc_sub_vnode.erl @@ -61,17 +61,18 @@ %% State -record(state, { partition :: non_neg_integer(), - buffer_fsms :: dict:dict(dcid(), #inter_dc_sub_buf{}) %% dcid -> buffer + buffer_fsms :: dict:dict(dcid(), inter_dc_sub_buf()) %% dcid -> buffer }). +-type state() :: #state{}. %%%% API --------------------------------------------------------------------+ --spec deliver_txn(#interdc_txn{}) -> ok. +-spec deliver_txn(interdc_txn()) -> ok. deliver_txn(Txn) -> call(Txn#interdc_txn.partition, {txn, Txn}). %% This function is called with the response from the log request operations request %% when some messages were lost --spec deliver_log_reader_resp(binary(), #request_cache_entry{}) -> ok. +-spec deliver_log_reader_resp(binary(), request_cache_entry()) -> ok. deliver_log_reader_resp(BinaryRep, _RequestCacheEntry) -> <> = BinaryRep, call(Partition, {log_reader_resp, RestBinary}). @@ -87,7 +88,7 @@ handle_command({txn, Txn = #interdc_txn{dcid = DCID}}, _Sender, State) -> {noreply, set_buf(DCID, Buf1, State)}; handle_command({log_reader_resp, BinaryRep}, _Sender, State) -> - %% The binary reply is type {pdcid(), [#interdc_txn{}]} + %% The binary reply is type {pdcid(), [interdc_txn()]} {{DCID, _Partition}, Txns} = binary_to_term(BinaryRep), Buf0 = get_buf(DCID, State), Buf1 = inter_dc_sub_buf:process({log_reader_resp, Txns}, Buf0), @@ -111,15 +112,15 @@ handle_overload_info(_, _) -> %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% --spec call(partition_id(), {txn, #interdc_txn{}} | {log_reader_resp, binary()}) -> ok. +-spec call(partition_id(), {txn, interdc_txn()} | {log_reader_resp, binary()}) -> ok. call(Partition, Request) -> dc_utilities:call_local_vnode(Partition, inter_dc_sub_vnode_master, Request). --spec get_buf(dcid(), #state{}) -> #inter_dc_sub_buf{}. +-spec get_buf(dcid(), state()) -> inter_dc_sub_buf(). get_buf(DCID, State) -> case dict:find(DCID, State#state.buffer_fsms) of {ok, Buf} -> Buf; error -> inter_dc_sub_buf:new_state({DCID, State#state.partition}) end. --spec set_buf(dcid(), #inter_dc_sub_buf{}, #state{}) -> #state{}. +-spec set_buf(dcid(), inter_dc_sub_buf(), state()) -> state(). set_buf(DCID, Buf, State) -> State#state{buffer_fsms = dict:store(DCID, Buf, State#state.buffer_fsms)}. diff --git a/src/inter_dc_txn.erl b/src/inter_dc_txn.erl index 1dc97e9f4..1bc4ab7c8 100644 --- a/src/inter_dc_txn.erl +++ b/src/inter_dc_txn.erl @@ -45,7 +45,7 @@ %% Functions --spec from_ops([#log_record{}], partition_id(), #op_number{} | none) -> #interdc_txn{}. +-spec from_ops([log_record()], partition_id(), op_number() | none) -> interdc_txn(). from_ops(Ops, Partition, PrevLogOpId) -> LastOp = lists:last(Ops), CommitPld = LastOp#log_record.log_operation, @@ -60,7 +60,7 @@ from_ops(Ops, Partition, PrevLogOpId) -> timestamp = CommitTime }. --spec ping(partition_id(), #op_number{} | none, non_neg_integer()) -> #interdc_txn{}. +-spec ping(partition_id(), op_number() | none, non_neg_integer()) -> interdc_txn(). ping(Partition, PrevLogOpId, Timestamp) -> #interdc_txn{ dcid = dc_meta_data_utilities:get_my_dc_id(), partition = Partition, @@ -70,7 +70,7 @@ ping(Partition, PrevLogOpId, Timestamp) -> #interdc_txn{ timestamp = Timestamp }. --spec last_log_opid(#interdc_txn{}) -> #op_number{}. +-spec last_log_opid(interdc_txn()) -> op_number(). last_log_opid(Txn = #interdc_txn{log_records = Ops, prev_log_opid = LogOpId}) -> case is_ping(Txn) of true -> LogOpId; @@ -81,24 +81,24 @@ last_log_opid(Txn = #interdc_txn{log_records = Ops, prev_log_opid = LogOpId}) -> LastOp#log_record.op_number end. --spec is_local(#interdc_txn{}) -> boolean(). +-spec is_local(interdc_txn()) -> boolean(). is_local(#interdc_txn{dcid = DCID}) -> DCID == dc_meta_data_utilities:get_my_dc_id(). --spec is_ping(#interdc_txn{}) -> boolean(). +-spec is_ping(interdc_txn()) -> boolean(). is_ping(#interdc_txn{log_records = Ops}) -> Ops == []. --spec ops_by_type(#interdc_txn{}, any()) -> [#log_record{}]. +-spec ops_by_type(interdc_txn(), any()) -> [log_record()]. ops_by_type(#interdc_txn{log_records = Ops}, Type) -> F = fun(Op) -> Type == Op#log_record.log_operation#log_operation.op_type end, lists:filter(F, Ops). --spec to_bin(#interdc_txn{}) -> binary(). +-spec to_bin(interdc_txn()) -> binary(). to_bin(Txn = #interdc_txn{partition = P}) -> Prefix = partition_to_bin(P), Msg = term_to_binary(Txn), <>. --spec from_bin(binary()) -> #interdc_txn{}. +-spec from_bin(binary()) -> interdc_txn(). from_bin(Bin) -> L = byte_size(Bin), Msg = binary_part(Bin, {?PARTITION_BYTE_LENGTH, L - ?PARTITION_BYTE_LENGTH}), diff --git a/src/log_txn_assembler.erl b/src/log_txn_assembler.erl index 56c48616c..adde5b43b 100644 --- a/src/log_txn_assembler.erl +++ b/src/log_txn_assembler.erl @@ -40,15 +40,15 @@ %% State -record(state, { - op_buffer :: dict:dict(txid(), [#log_record{}]) + op_buffer :: dict:dict(txid(), [log_record()]) }). - +-type state() :: #state{}. %%%% API --------------------------------------------------------------------+ --spec new_state() -> #state{}. +-spec new_state() -> state(). new_state() -> #state{op_buffer = dict:new()}. --spec process(#log_record{}, #state{}) -> {{ok, [#log_record{}]} | none, #state{}}. +-spec process(log_record(), state()) -> {{ok, [log_record()]} | none, state()}. process(LogRecord, State) -> Payload = LogRecord#log_record.log_operation, TxId = Payload#log_operation.tx_id, @@ -59,10 +59,10 @@ process(LogRecord, State) -> _ -> {none, State#state{op_buffer = dict:store(TxId, NewTxnBuf, State#state.op_buffer)}} end. --spec process_all([#log_record{}], #state{}) -> {[[#log_record{}]], #state{}}. +-spec process_all([log_record()], state()) -> {[[log_record()]], state()}. process_all(LogRecords, State) -> process_all(LogRecords, [], State). --spec process_all([#log_record{}], [[#log_record{}]], #state{}) -> {[[#log_record{}]], #state{}}. +-spec process_all([log_record()], [[log_record()]], state()) -> {[[log_record()]], state()}. process_all([], Acc, State) -> {Acc, State}; process_all([H|T], Acc, State) -> {Result, NewState} = process(H, State), @@ -74,7 +74,7 @@ process_all([H|T], Acc, State) -> %%%% Methods ----------------------------------------------------------------+ --spec find_or_default(#tx_id{}, any(), dict:dict()) -> any(). +-spec find_or_default(txid(), any(), dict:dict()) -> any(). find_or_default(Key, Default, Dict) -> case dict:find(Key, Dict) of {ok, Val} -> Val; diff --git a/src/log_utilities.erl b/src/log_utilities.erl index 5943865f0..48a57874a 100644 --- a/src/log_utilities.erl +++ b/src/log_utilities.erl @@ -125,7 +125,7 @@ log_record_version() -> ?LOG_RECORD_VERSION. %% to a different version if necessary %% Checked when loading the log from disk, or %% when log messages are received from another DC --spec check_log_record_version(#log_record{}) -> #log_record{}. +-spec check_log_record_version(log_record()) -> log_record(). check_log_record_version(LogRecord) -> %% Only support one version for now ?LOG_RECORD_VERSION = LogRecord#log_record.version, diff --git a/src/logging_vnode.erl b/src/logging_vnode.erl index d3ec68b48..6aa30a663 100644 --- a/src/logging_vnode.erl +++ b/src/logging_vnode.erl @@ -139,7 +139,7 @@ read(Node, Log) -> ?LOGGING_MASTER). %% @doc Sends an `append' asyncrhonous command to the Logs in `Preflist' --spec asyn_append(index_node(), key(), #log_operation{}, sender()) -> ok. +-spec asyn_append(index_node(), key(), log_operation(), sender()) -> ok. asyn_append(IndexNode, Log, LogOperation, ReplyTo) -> riak_core_vnode_master:command(IndexNode, {append, Log, LogOperation, ?SYNC_LOG}, @@ -147,7 +147,7 @@ asyn_append(IndexNode, Log, LogOperation, ReplyTo) -> ?LOGGING_MASTER). %% @doc synchronous append operation payload --spec append(index_node(), key(), #log_operation{}) -> {ok, op_id()} | {error, term()}. +-spec append(index_node(), key(), log_operation()) -> {ok, op_id()} | {error, term()}. append(IndexNode, LogId, LogOperation) -> riak_core_vnode_master:sync_command(IndexNode, {append, LogId, LogOperation, false}, @@ -156,7 +156,7 @@ append(IndexNode, LogId, LogOperation) -> %% @doc synchronous append operation payload %% If enabled in antidote.hrl will ensure item is written to disk --spec append_commit(index_node(), key(), #log_operation{}) -> {ok, op_id()} | {error, term()}. +-spec append_commit(index_node(), key(), log_operation()) -> {ok, op_id()} | {error, term()}. append_commit(IndexNode, LogId, Payload) -> riak_core_vnode_master:sync_command(IndexNode, {append, LogId, Payload, is_sync_log()}, @@ -165,7 +165,7 @@ append_commit(IndexNode, LogId, Payload) -> %% @doc synchronous append list of log records (note a log record is a payload (log_operation) with an operation number) %% The IsLocal flag indicates if the operations in the transaction were handled by the local or remote DC. --spec append_group(index_node(), key(), [#log_record{}], boolean()) -> {ok, op_id()} | {error, term()}. +-spec append_group(index_node(), key(), [log_record()], boolean()) -> {ok, op_id()} | {error, term()}. append_group(IndexNode, LogId, LogRecordList, IsLocal) -> riak_core_vnode_master:sync_command(IndexNode, {append_group, LogId, LogRecordList, IsLocal, is_sync_log()}, @@ -173,7 +173,7 @@ append_group(IndexNode, LogId, LogRecordList, IsLocal) -> infinity). %% @doc asynchronous append list of operations --spec asyn_append_group(index_node(), key(), [#log_record{}], boolean()) -> ok. +-spec asyn_append_group(index_node(), key(), [log_record()], boolean()) -> ok. asyn_append_group(IndexNode, LogId, LogRecordList, IsLocal) -> riak_core_vnode_master:command(IndexNode, {append_group, LogId, LogRecordList, IsLocal, is_sync_log()}, @@ -183,7 +183,7 @@ asyn_append_group(IndexNode, LogId, LogRecordList, IsLocal) -> %% @doc given the MaxSnapshotTime and the type, this method fetches from the log the %% desired operations smaller than the time so a new snapshot can be created. -spec get_up_to_time(index_node(), key(), vectorclock(), type(), key()) -> - #snapshot_get_response{} | {error, reason()}. + snapshot_get_response() | {error, reason()}. get_up_to_time(IndexNode, LogId, MaxSnapshotTime, Type, Key) -> riak_core_vnode_master:sync_command(IndexNode, {get, LogId, undefined, MaxSnapshotTime, Type, Key}, @@ -192,9 +192,9 @@ get_up_to_time(IndexNode, LogId, MaxSnapshotTime, Type, Key) -> %% @doc given the MinSnapshotTime and the type, this method fetchss from the log the %% desired operations so a new snapshot can be created. -%% It returns a #snapshot_get_response{} record which is defined in antidote.hrl +%% It returns a snapshot_get_response() record which is defined in antidote.hrl -spec get_from_time(index_node(), key(), vectorclock(), type(), key()) -> - #snapshot_get_response{} | {error, reason()}. + snapshot_get_response() | {error, reason()}. get_from_time(IndexNode, LogId, MinSnapshotTime, Type, Key) -> riak_core_vnode_master:sync_command(IndexNode, {get, LogId, MinSnapshotTime, undefined, Type, Key}, @@ -205,7 +205,7 @@ get_from_time(IndexNode, LogId, MinSnapshotTime, Type, Key) -> %% desired operations so a new snapshot can be created. %% It returns a #log_get_response{} record which is defined in antidote.hrl -spec get_range(index_node(), key(), vectorclock(), vectorclock(), type(), key()) -> - #snapshot_get_response{} | {error, reason()}. + snapshot_get_response() | {error, reason()}. get_range(IndexNode, LogId, MinSnapshotTime, MaxSnapshotTime, Type, Key) -> riak_core_vnode_master:sync_command(IndexNode, {get, LogId, MinSnapshotTime, MaxSnapshotTime, Type, Key}, @@ -221,9 +221,9 @@ get_range(IndexNode, LogId, MinSnapshotTime, MaxSnapshotTime, Type, Key) -> %% the third is the location of the next chunk %% Otherwise if the end of the file is reached it returns a tuple %% where the first elelment is 'eof' and the second is a dict of commited operations --spec get_all(index_node(), log_id(), start | disk_log:continuation(), dict:dict(key(), [{non_neg_integer(), #clocksi_payload{}}])) -> - {disk_log:continuation(), dict:dict(txid(), [any_log_payload()]), dict:dict(key(), [{non_neg_integer(), #clocksi_payload{}}])} - | {error, reason()} | {eof, dict:dict(key(), [{non_neg_integer(), #clocksi_payload{}}])}. +-spec get_all(index_node(), log_id(), start | disk_log:continuation(), dict:dict(key(), [{non_neg_integer(), clocksi_payload()}])) -> + {disk_log:continuation(), dict:dict(txid(), [any_log_payload()]), dict:dict(key(), [{non_neg_integer(), clocksi_payload()}])} + | {error, reason()} | {eof, dict:dict(key(), [{non_neg_integer(), clocksi_payload()}])}. get_all(IndexNode, LogId, Continuation, PrevOps) -> riak_core_vnode_master:sync_command(IndexNode, {get_all, LogId, Continuation, PrevOps}, ?LOGGING_MASTER, @@ -385,8 +385,8 @@ handle_command({read_from, LogId, _From}, _Sender, %% OpId: Unique operation id %% Output: {ok, {vnode_id, op_id}} | {error, Reason} %% -%% -spec handle_command({append, log_id(), #log_operation{}, boolean()}, pid(), #state{}) -> -%% {reply, {ok, #op_number{}} #state{}} | {reply, error(), #state{}}. +%% -spec handle_command({append, log_id(), log_operation(), boolean()}, pid(), #state{}) -> +%% {reply, {ok, op_number()} #state{}} | {reply, error(), #state{}}. handle_command({append, LogId, LogOperation, Sync}, _Sender, #state{logs_map=Map, op_id_table=OpIdTable, @@ -445,8 +445,8 @@ handle_command({append, LogId, LogOperation, Sync}, _Sender, %% That is why IsLocal is hard coded to false %% Might want to support appending groups of local operations in the future %% for efficiency -%% -spec handle_command({append_group, log_id(), [#log_record{}], false, boolean()}, pid(), #state{}) -> -%% {reply, {ok, #op_number{}} #state{}} | {reply, error(), #state{}}. +%% -spec handle_command({append_group, log_id(), [log_record()], false, boolean()}, pid(), #state{}) -> +%% {reply, {ok, op_number()} #state{}} | {reply, error(), #state{}}. handle_command({append_group, LogId, LogRecordList, _IsLocal = false, Sync}, _Sender, #state{logs_map=Map, op_id_table=OpIdTable, @@ -617,7 +617,7 @@ get_last_op_from_log(Log, Continuation, ClockTable, PrevMaxVector) -> %% This is called when the vnode starts and loads into the cache %% the id of the last operation appended to the log, so that new ops will %% be assigned correct ids (after crash and restart) --spec get_max_op_numbers([{log_id(), #log_record{}}], cache_id(), vectorclock()) -> vectorclock(). +-spec get_max_op_numbers([{log_id(), log_record()}], cache_id(), vectorclock()) -> vectorclock(). get_max_op_numbers([], _ClockTable, MaxVector) -> MaxVector; get_max_op_numbers([{LogId, LogRecord}|Rest], ClockTable, PrevMaxVector) -> @@ -645,7 +645,7 @@ get_max_op_numbers([{LogId, LogRecord}|Rest], ClockTable, PrevMaxVector) -> get_max_op_numbers(Rest, ClockTable, NewMaxVector). %% After appeded an operation to the log, increment the op id --spec update_ets_op_id({log_id(), dcid()} | {log_id(), bucket(), dcid()}, #op_number{}, cache_id()) -> true. +-spec update_ets_op_id({log_id(), dcid()} | {log_id(), bucket(), dcid()}, op_number(), cache_id()) -> true. update_ets_op_id(Key, NewOp, ClockTable) -> #op_number{local = Num, global = GlobalNum} = NewOp, case ets:lookup(ClockTable, Key) of @@ -668,13 +668,13 @@ update_ets_op_id(Key, NewOp, ClockTable) -> snapshot_time(), snapshot_time(), Ops :: dict:dict(txid(), [any_log_payload()]), - CommittedOpsDict :: dict:dict(key(), [#clocksi_payload{}]), + CommittedOpsDict :: dict:dict(key(), [clocksi_payload()]), load_all | load_per_chunk) -> {disk_log:continuation(), dict:dict(txid(), [any_log_payload()]), - dict:dict(key(), [{non_neg_integer(), #clocksi_payload{}}])} | + dict:dict(key(), [{non_neg_integer(), clocksi_payload()}])} | {error, reason()} | - {eof, dict:dict(key(), [{non_neg_integer(), #clocksi_payload{}}])}. + {eof, dict:dict(key(), [{non_neg_integer(), clocksi_payload()}])}. get_ops_from_log(Log, Key, Continuation, MinSnapshotTime, MaxSnapshotTime, Ops, CommittedOpsDict, LoadAll) -> case disk_log:chunk(Log, Continuation) of eof -> @@ -713,9 +713,9 @@ finish_op_load(CommittedOpsDict) -> %% If key is undefined then is returns all records for all keys %% It returns a dict corresponding to all the ops matching Key and %% a list of the committed operations for that key which have a smaller commit time than MinSnapshotTime. --spec filter_terms_for_key([{non_neg_integer(), #log_record{}}], key(), snapshot_time(), snapshot_time(), - dict:dict(txid(), [any_log_payload()]), dict:dict(key(), [#clocksi_payload{}])) -> - {dict:dict(txid(), [any_log_payload()]), dict:dict(key(), [#clocksi_payload{}])}. +-spec filter_terms_for_key([{non_neg_integer(), log_record()}], key(), snapshot_time(), snapshot_time(), + dict:dict(txid(), [any_log_payload()]), dict:dict(key(), [clocksi_payload()])) -> + {dict:dict(txid(), [any_log_payload()]), dict:dict(key(), [clocksi_payload()])}. filter_terms_for_key([], _Key, _MinSnapshotTime, _MaxSnapshotTime, Ops, CommittedOpsDict) -> {Ops, CommittedOpsDict}; filter_terms_for_key([{_, LogRecord}|T], Key, MinSnapshotTime, MaxSnapshotTime, Ops, CommittedOpsDict) -> @@ -730,9 +730,9 @@ filter_terms_for_key([{_, LogRecord}|T], Key, MinSnapshotTime, MaxSnapshotTime, filter_terms_for_key(T, Key, MinSnapshotTime, MaxSnapshotTime, Ops, CommittedOpsDict) end. --spec handle_update(txid(), #update_log_payload{}, [{non_neg_integer(), #log_record{}}], key(), snapshot_time() | undefined, - snapshot_time(), dict:dict(txid(), [any_log_payload()]), dict:dict(key(), [#clocksi_payload{}])) -> - {dict:dict(txid(), [any_log_payload()]), dict:dict(key(), [#clocksi_payload{}])}. +-spec handle_update(txid(), update_log_payload(), [{non_neg_integer(), log_record()}], key(), snapshot_time() | undefined, + snapshot_time(), dict:dict(txid(), [any_log_payload()]), dict:dict(key(), [clocksi_payload()])) -> + {dict:dict(txid(), [any_log_payload()]), dict:dict(key(), [clocksi_payload()])}. handle_update(TxId, OpPayload, T, Key, MinSnapshotTime, MaxSnapshotTime, Ops, CommittedOpsDict) -> #update_log_payload{key = Key1} = OpPayload, case (Key == {key, Key1}) or (Key == undefined) of @@ -743,9 +743,9 @@ handle_update(TxId, OpPayload, T, Key, MinSnapshotTime, MaxSnapshotTime, Ops, C filter_terms_for_key(T, Key, MinSnapshotTime, MaxSnapshotTime, Ops, CommittedOpsDict) end. --spec handle_commit(txid(), #commit_log_payload{}, [{non_neg_integer(), #log_record{}}], key(), snapshot_time() | undefined, - snapshot_time(), dict:dict(txid(), [any_log_payload()]), dict:dict(key(), [#clocksi_payload{}])) -> - {dict:dict(txid(), [any_log_payload()]), dict:dict(key(), [#clocksi_payload{}])}. +-spec handle_commit(txid(), commit_log_payload(), [{non_neg_integer(), log_record()}], key(), snapshot_time() | undefined, + snapshot_time(), dict:dict(txid(), [any_log_payload()]), dict:dict(key(), [clocksi_payload()])) -> + {dict:dict(txid(), [any_log_payload()]), dict:dict(key(), [clocksi_payload()])}. handle_commit(TxId, OpPayload, T, Key, MinSnapshotTime, MaxSnapshotTime, Ops, CommittedOpsDict) -> #commit_log_payload{commit_time = {DcId, TxCommitTime}, snapshot_time = SnapshotTime} = OpPayload, case dict:find(TxId, Ops) of @@ -969,7 +969,7 @@ fold_log(Log, Continuation, F, Acc) -> %% Payload: The payload of the operation to insert %% Return: {ok, OpId} | {error, Reason} %% --spec insert_log_record(log(), log_id(), #log_record{}, boolean()) -> {ok, #op_number{}} | {error, reason()}. +-spec insert_log_record(log(), log_id(), log_record(), boolean()) -> {ok, op_number()} | {error, reason()}. insert_log_record(Log, LogId, LogRecord, EnableLogging) -> Result = case EnableLogging of true -> @@ -994,7 +994,7 @@ insert_log_record(Log, LogId, LogRecord, EnableLogging) -> preflist_member(Partition, Preflist) -> lists:any(fun({P, _}) -> P =:= Partition end, Preflist). --spec get_op_id(cache_id(), {log_id(), dcid()} | {log_id(), bucket(), dcid()}) -> #op_number{}. +-spec get_op_id(cache_id(), {log_id(), dcid()} | {log_id(), bucket(), dcid()}) -> op_number(). get_op_id(ClockTable, {LogId, DCID}) -> case ets:lookup(ClockTable, {LogId, DCID}) of [] -> diff --git a/src/materializer_vnode.erl b/src/materializer_vnode.erl index 7f807a84f..366181f25 100644 --- a/src/materializer_vnode.erl +++ b/src/materializer_vnode.erl @@ -77,14 +77,14 @@ handle_overload_info/2 ]). --type op_and_id() :: {non_neg_integer(), #clocksi_payload{}}. +-type op_and_id() :: {non_neg_integer(), clocksi_payload()}. -record(state, { partition :: partition_id(), ops_cache :: cache_id(), snapshot_cache :: cache_id(), is_ready :: boolean() }). - +-type state() :: #state{}. %%---------------- API Functions -------------------%% start_vnode(I) -> @@ -111,7 +111,7 @@ update(Key, DownstreamOp) -> %%@doc write snapshot to cache for future read, snapshots are stored %% one at a time into the ets table --spec store_ss(key(), #materialized_snapshot{}, snapshot_time()) -> ok. +-spec store_ss(key(), materialized_snapshot(), snapshot_time()) -> ok. store_ss(Key, Snapshot, CommitTime) -> IndexNode = log_utilities:get_key_partition(Key), riak_core_vnode_master:command(IndexNode, {store_ss, Key, Snapshot, CommitTime}, @@ -285,7 +285,7 @@ terminate(_Reason, _State=#state{ops_cache=OpsCache, snapshot_cache=SnapshotCach get_cache_name(Partition, Base) -> list_to_atom(atom_to_list(Base) ++ "-" ++ integer_to_list(Partition)). --spec load_from_log_to_tables(partition_id(), #state{}) -> ok | {error, reason()}. +-spec load_from_log_to_tables(partition_id(), state()) -> ok | {error, reason()}. load_from_log_to_tables(Partition, State) -> LogId = [Partition], Node = {Partition, log_utilities:get_my_node(Partition)}, @@ -295,7 +295,7 @@ load_from_log_to_tables(Partition, State) -> log_id(), start | disk_log:continuation(), dict:dict(txid(), [any_log_payload()]), - #state{}) -> + state()) -> ok | {error, reason()}. loop_until_loaded(Node, LogId, Continuation, Ops, State) -> case logging_vnode:get_all(Node, LogId, Continuation, Ops) of @@ -309,7 +309,7 @@ loop_until_loaded(Node, LogId, Continuation, Ops, State) -> ok end. --spec load_ops(dict:dict(key(), [{non_neg_integer(), clocksi_payload()}]), #state{}) -> true. +-spec load_ops(dict:dict(key(), [{non_neg_integer(), clocksi_payload()}]), state()) -> true. load_ops(OpsDict, State) -> dict:fold(fun(Key, CommittedOps, _Acc) -> lists:foreach(fun({_OpId, Op}) -> @@ -338,7 +338,7 @@ open_table(Partition, Name) -> end. --spec internal_store_ss(key(), #materialized_snapshot{}, snapshot_time(), boolean(), #state{}) -> boolean(). +-spec internal_store_ss(key(), materialized_snapshot(), snapshot_time(), boolean(), state()) -> boolean(). internal_store_ss(Key, Snapshot = #materialized_snapshot{last_op_id = NewOpId}, CommitTime, ShouldGc, State = #state{snapshot_cache=SnapshotCache}) -> SnapshotDict = case ets:lookup(SnapshotCache, Key) of [] -> @@ -365,7 +365,7 @@ internal_store_ss(Key, Snapshot = #materialized_snapshot{last_op_id = NewOpId}, %% @doc This function takes care of reading. It is implemented here for not blocking the %% vnode when the write function calls it. That is done for garbage collection. --spec internal_read(key(), type(), snapshot_time(), txid() | ignore, clocksi_readitem_server:read_property_list(), boolean(), #state{}) +-spec internal_read(key(), type(), snapshot_time(), txid() | ignore, clocksi_readitem_server:read_property_list(), boolean(), state()) -> {ok, snapshot()} | {error, no_snapshot}. internal_read(Key, Type, MinSnapshotTime, TxId, _PropertyList, ShouldGc, State) -> @@ -379,7 +379,7 @@ internal_read(Key, Type, MinSnapshotTime, TxId, _PropertyList, ShouldGc, State) %% %% If there's no in-memory suitable snapshot, it will fetch it from the replication log. %% --spec get_from_snapshot_cache(txid() | ignore, key(), type(), snapshot_time(), #state{}) -> #snapshot_get_response{}. +-spec get_from_snapshot_cache(txid() | ignore, key(), type(), snapshot_time(), state()) -> snapshot_get_response(). get_from_snapshot_cache(TxId, Key, Type, MinSnaphsotTime, State = #state{ ops_cache=OpsCache, @@ -412,7 +412,7 @@ get_from_snapshot_cache(TxId, Key, Type, MinSnaphsotTime, State = #state{ end end. --spec get_from_snapshot_log(key(), type(), snapshot_time()) -> #snapshot_get_response{}. +-spec get_from_snapshot_log(key(), type(), snapshot_time()) -> snapshot_get_response(). get_from_snapshot_log(Key, Type, SnapshotTime) -> LogId = log_utilities:get_logid_from_key(Key), Partition = log_utilities:get_key_partition(Key), @@ -422,7 +422,7 @@ get_from_snapshot_log(Key, Type, SnapshotTime) -> %% %% If `ShouldGC' is true, it will try to prune the in-memory cache before inserting. %% --spec store_snapshot(txid() | ignore, key(), #materialized_snapshot{}, snapshot_time(), boolean(), #state{}) -> ok. +-spec store_snapshot(txid() | ignore, key(), materialized_snapshot(), snapshot_time(), boolean(), state()) -> ok. store_snapshot(TxId, Key, Snapshot, Time, ShouldGC, State) -> %% AB: Why don't we need to synchronize through the gen_server if the TxId is ignore?? case TxId of @@ -434,8 +434,8 @@ store_snapshot(TxId, Key, Snapshot, Time, ShouldGC, State) -> end. %% @doc Given a snapshot from the cache, update it from the ops cache. --spec update_snapshot_from_cache({{snapshot_time() | ignore, #materialized_snapshot{}}, boolean()}, key(), cache_id()) - -> #snapshot_get_response{}. +-spec update_snapshot_from_cache({{snapshot_time() | ignore, materialized_snapshot()}, boolean()}, key(), cache_id()) + -> snapshot_get_response(). update_snapshot_from_cache(SnapshotResponse, Key, OpsCache) -> {{SnapshotCommitTime, LatestSnapshot}, IsFirst} = SnapshotResponse, @@ -463,7 +463,7 @@ fetch_updates_from_cache(OpsCache, Key) -> {CachedOps, Length} end. --spec materialize_snapshot(txid() | ignore, key(), type(), snapshot_time(), boolean(), #state{}, #snapshot_get_response{}) +-spec materialize_snapshot(txid() | ignore, key(), type(), snapshot_time(), boolean(), state(), snapshot_get_response()) -> {ok, snapshot_time()} | {error, reason()}. materialize_snapshot(_TxId, _Key, _Type, _SnapshotTime, _ShouldGC, _State, #snapshot_get_response{ @@ -511,7 +511,7 @@ materialize_snapshot(TxId, Key, Type, SnapshotTime, ShouldGC, State, SnapshotRes %% @doc Operation to insert a Snapshot in the cache and start %% Garbage collection triggered by reads. -spec snapshot_insert_gc(key(), vector_orddict:vector_orddict(), - boolean(), #state{}) -> true. + boolean(), state()) -> true. snapshot_insert_gc(Key, SnapshotDict, ShouldGc, #state{snapshot_cache = SnapshotCache, ops_cache = OpsCache})-> %% Perform the garbage collection when the size of the snapshot dict passed the threshold %% or when a GC is forced (a GC is forced after every ?OPS_THRESHOLD ops are inserted into the cache) @@ -589,7 +589,7 @@ prune_ops({Len, OpsTuple}, Threshold)-> %% of the remaining operations and the size of that list %% It is used during garbage collection to filter out operations that are older than any %% of the cached snapshots --spec check_filter(fun(({non_neg_integer(), #clocksi_payload{}}) -> boolean()), non_neg_integer(), non_neg_integer(), +-spec check_filter(fun(({non_neg_integer(), clocksi_payload()}) -> boolean()), non_neg_integer(), non_neg_integer(), non_neg_integer(), tuple(), non_neg_integer(), [{non_neg_integer(), op_and_id()}]) -> {non_neg_integer(), [{non_neg_integer(), op_and_id()}]}. check_filter(_Fun, Id, Last, _NewId, _Tuple, NewSize, NewOps) when (Id == Last) -> @@ -618,7 +618,7 @@ deconstruct_opscache_entry(Tuple) -> {Key, Length, OpId, ListLen, Tuple}. %% @doc Insert an operation and optionally start garbage collection triggered by writes. --spec op_insert_gc(key(), clocksi_payload(), #state{}) -> true. +-spec op_insert_gc(key(), clocksi_payload(), state()) -> true. op_insert_gc(Key, DownstreamOp, State = #state{ops_cache = OpsCache}) -> %% Check whether there is an ops cache entry for the key case ets:member(OpsCache, Key) of diff --git a/test/multidc/pb_client_cluster_management_SUITE.erl b/test/multidc/pb_client_cluster_management_SUITE.erl new file mode 100644 index 000000000..2235cf090 --- /dev/null +++ b/test/multidc/pb_client_cluster_management_SUITE.erl @@ -0,0 +1,154 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright <2013-2018> < +%% Technische Universität Kaiserslautern, Germany +%% Université Pierre et Marie Curie / Sorbonne-Université, France +%% Universidade NOVA de Lisboa, Portugal +%% Université catholique de Louvain (UCL), Belgique +%% INESC TEC, Portugal +%% > +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either expressed or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% List of the contributors to the development of Antidote: see AUTHORS file. +%% Description and complete License: see LICENSE file. +%% ------------------------------------------------------------------- + +-module(pb_client_cluster_management_SUITE). + +%% common_test callbacks +-export([ + init_per_suite/1, + end_per_suite/1, + init_per_testcase/2, + end_per_testcase/2, + all/0]). + +%% tests +-export([ + setup_cluster_test/1 +]). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-define(ADDRESS, "localhost"). +-define(PORT, 10017). +-define(BUCKET_BIN, term_to_binary(test_utils:bucket(pb_client_bucket))). + + +init_per_suite(_Config) -> + []. + + +end_per_suite(Config) -> + Config. + + +init_per_testcase(_Case, Config) -> + Config. + + +end_per_testcase(Name, _) -> + ct:print("[ OK ] ~p", [Name]), + ok. + + +all() -> [ + setup_cluster_test +]. + +-spec send_pb_message(pid(), antidote_pb_codec:request()) -> antidote_pb_codec:response_in(). +send_pb_message(Pid, Message) -> + EncMsg = antidote_pb_codec:encode_request(Message), + ResponseRaw = antidotec_pb_socket:call_infinity(Pid, {req, EncMsg, infinity}), + antidote_pb_codec:decode_response(ResponseRaw). + + +%% Single object rea +setup_cluster_test(Config) -> + NodeNames = [clusterdev1, clusterdev2, clusterdev3, clusterdev4], + Nodes = test_utils:pmap(fun(Node) -> test_utils:start_node(Node, Config) end, NodeNames), + [Node1, Node2, Node3, Node4] = Nodes, + + ct:pal("Check1"), + + % join cluster 1: + P1 = spawn_link(fun() -> + {ok, Pb} = antidotec_pb_socket:start(?ADDRESS, test_utils:web_ports(clusterdev1) + 2), + ct:pal("joining clusterdev1, clusterdev2"), + Response = send_pb_message(Pb, {create_dc, [Node1, Node2]}), + ct:pal("joined clusterdev1, clusterdev2: ~p", [Response]), + {operation_response,ok} = Response, + _Disconnected = antidotec_pb_socket:stop(Pb) + end), + + % join cluster 2: + P2 = spawn_link(fun() -> + {ok, Pb} = antidotec_pb_socket:start(?ADDRESS, test_utils:web_ports(clusterdev3) + 2), + ct:pal("joining clusterdev3, clusterdev4"), + Response = send_pb_message(Pb, {create_dc, [Node3, Node4]}), + ct:pal("joined clusterdev3, clusterdev4: ~p", [Response]), + {operation_response,ok} = Response, + _Disconnected = antidotec_pb_socket:stop(Pb) + end), + + wait_for_process(P1), + wait_for_process(P2), + + % get descriptor of cluster 2: + {ok, Pb3} = antidotec_pb_socket:start(?ADDRESS, test_utils:web_ports(clusterdev3) + 2), + ct:pal("get_connection_descriptor clusterdev3"), + {get_connection_descriptor_resp, {ok, DescriptorBin3}} = send_pb_message(Pb3, get_connection_descriptor), + Descriptor3 = binary_to_term(DescriptorBin3), + ct:pal("get_connection_descriptor clusterdev3: ~p", [Descriptor3]), + + + % use descriptor to connect both dcs + {ok, Pb1} = antidotec_pb_socket:start(?ADDRESS, test_utils:web_ports(clusterdev1) + 2), + ct:pal("connecting clusters"), + Response1 = send_pb_message(Pb1, {connect_to_dcs, [DescriptorBin3]}), + ct:pal("connected clusters: ~p", [Response1]), + ?assertEqual({operation_response, ok}, Response1), + + + + Bucket = ?BUCKET_BIN, + Bound_object = {<<"key1">>, antidote_crdt_counter_pn, Bucket}, + + % write counter on clusterdev4: + {ok, Pb4} = antidotec_pb_socket:start(?ADDRESS, test_utils:web_ports(clusterdev4) + 2), + {ok, TxId4} = antidotec_pb:start_transaction(Pb3, ignore, []), + ok = antidotec_pb:update_objects(Pb3, [{Bound_object, increment, 4711}], TxId4), + {ok, CommitTime} = antidotec_pb:commit_transaction(Pb3, TxId4), + + % read counter on clusterdev1 + {ok, TxId1} = antidotec_pb:start_transaction(Pb1, CommitTime, []), + {ok, [Counter]} = antidotec_pb:read_objects(Pb1, [Bound_object], TxId1), + {ok, _} = antidotec_pb:commit_transaction(Pb1, TxId1), + + ?assertMatch(true, antidotec_counter:is_type(Counter)), + ?assertEqual(4711, antidotec_counter:value(Counter)), + + _Disconnected1 = antidotec_pb_socket:stop(Pb1), + _Disconnected3 = antidotec_pb_socket:stop(Pb3), + _Disconnected3 = antidotec_pb_socket:stop(Pb4). + + +wait_for_process(P) -> + MonitorRef = monitor(process, P), + receive + {_Tag, MonitorRef, _Type, _Object, _Info} -> ok + end. diff --git a/test/release_test.escript b/test/release_test.escript index 335850ff8..01336c029 100755 --- a/test/release_test.escript +++ b/test/release_test.escript @@ -5,7 +5,7 @@ -define(PORT, 8087). load(Dep) -> - Path = filename:dirname(escript:script_name()) ++ "/../_build/test/lib/" ++ Dep ++ "/ebin", + Path = filename:dirname(escript:script_name()) ++ "/../_build/default/lib/" ++ Dep ++ "/ebin", case code:add_pathz(Path) of true -> true; @@ -25,7 +25,7 @@ test_transaction(Tries) -> Key = <<"release_test_key">>, Bound_object = {Key, antidote_crdt_counter_pn, <<"release_test_key_bucket">>}, io:format("Starting Test transaction~n"), - case antidotec_pb:start_transaction(Pid, ignore, []) of + case antidotec_pb:start_transaction(Pid, ignore) of {error, Reason} when Tries > 0 -> io:format("Could not start transaction: ~p~n", [Reason]), timer:sleep(1000), diff --git a/test/singledc/pb_client_SUITE.erl b/test/singledc/pb_client_SUITE.erl index 1d8a34eae..6e239cd66 100644 --- a/test/singledc/pb_client_SUITE.erl +++ b/test/singledc/pb_client_SUITE.erl @@ -194,7 +194,7 @@ pb_test_set_read_write(_Config) -> {ok, _} = antidotec_pb:commit_transaction(Pid, TxId), %% Read committed updated - {ok, Tx2} = antidotec_pb:start_transaction(Pid, term_to_binary(ignore), []), + {ok, Tx2} = antidotec_pb:start_transaction(Pid, ignore, []), {ok, [Val]} = antidotec_pb:read_objects(Pid, [Bound_object], Tx2), {ok, _} = antidotec_pb:commit_transaction(Pid, Tx2), diff --git a/test/utils/ct_slave_ext.erl b/test/utils/ct_slave_ext.erl new file mode 100644 index 000000000..5a9b77903 --- /dev/null +++ b/test/utils/ct_slave_ext.erl @@ -0,0 +1,376 @@ +%%-------------------------------------------------------------------- +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2010-2018. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%% +%% %CopyrightEnd% + +%%---------------------------------------------------------------------- +%% This is an adaptation of ct_slave from the Erlang standard library. +%% It adds the following features: +%% - Configuration to set the working directory of a slave +%%---------------------------------------------------------------------- +-module(ct_slave_ext). + +-export([start/1, start/2, start/3, stop/1, stop/2]). + +-export([slave_started/2, slave_ready/2, monitor_master/1]). + +-record(options, {username, password, boot_timeout, init_timeout, + startup_timeout, startup_functions, monitor_master, + kill_if_fail, erl_flags, env, ssh_port, ssh_opts, + stop_timeout, working_dir}). + +start(Node) -> + start(gethostname(), Node). + +start(_HostOrNode = Node, _NodeOrOpts = Opts) %% match to satiate edoc + when is_list(Opts) -> + start(gethostname(), Node, Opts); + +start(Host, Node) -> + start(Host, Node, []). + +start(Host, Node, Opts) -> + ENode = enodename(Host, Node), + case erlang:is_alive() of + false-> + {error, not_alive, node()}; + true-> + case is_started(ENode) of + false-> + OptionsRec = fetch_options(Opts), + do_start(Host, Node, OptionsRec); + {true, not_connected}-> + {error, started_not_connected, ENode}; + {true, connected}-> + {error, already_started, ENode} + end + end. + +stop(Node) -> + stop(gethostname(), Node). + +stop(_HostOrNode = Node, _NodeOrOpts = Opts) %% match to satiate edoc + when is_list(Opts) -> + stop(gethostname(), Node, Opts); + +stop(Host, Node) -> + stop(Host, Node, []). + +stop(Host, Node, Opts) -> + ENode = enodename(Host, Node), + case is_started(ENode) of + {true, connected}-> + OptionsRec = fetch_options(Opts), + do_stop(ENode, OptionsRec); + {true, not_connected}-> + {error, not_connected, ENode}; + false-> + {error, not_started, ENode} + end. + +%%% fetch an option value from the tagged tuple list with default +get_option_value(Key, OptionList, Default) -> + case lists:keyfind(Key, 1, OptionList) of + false-> + Default; + {Key, Value}-> + Value + end. + +%%% convert option list to the option record, fill all defaults +fetch_options(Options) -> + UserName = get_option_value(username, Options, []), + Password = get_option_value(password, Options, []), + BootTimeout = get_option_value(boot_timeout, Options, 3), + InitTimeout = get_option_value(init_timeout, Options, 1), + StartupTimeout = get_option_value(startup_timeout, Options, 1), + StartupFunctions = get_option_value(startup_functions, Options, []), + Monitor = get_option_value(monitor_master, Options, false), + KillIfFail = get_option_value(kill_if_fail, Options, true), + ErlFlags = get_option_value(erl_flags, Options, []), + EnvVars = get_option_value(env, Options, []), + SSHPort = get_option_value(ssh_port, Options, []), + SSHOpts = get_option_value(ssh_opts, Options, []), + StopTimeout = get_option_value(stop_timeout, Options, 5), + WorkingDir = get_option_value(cd, Options ,false), + #options{username=UserName, password=Password, + boot_timeout=BootTimeout, init_timeout=InitTimeout, + startup_timeout=StartupTimeout, startup_functions=StartupFunctions, + monitor_master=Monitor, kill_if_fail=KillIfFail, + erl_flags=ErlFlags, env=EnvVars, ssh_port=SSHPort, ssh_opts=SSHOpts, + stop_timeout=StopTimeout, working_dir=WorkingDir}. + +% send a message when slave node is started +slave_started(ENode, MasterPid) -> + MasterPid ! {node_started, ENode}, + ok. + +% send a message when slave node has finished startup +slave_ready(ENode, MasterPid) -> + MasterPid ! {node_ready, ENode}, + ok. + +% start monitoring of the master node +monitor_master(MasterNode) -> + spawn(fun() -> monitor_master_int(MasterNode) end). + +% code of the masterdeath-waiter process +monitor_master_int(MasterNode) -> + ct_util:mark_process(), + erlang:monitor_node(MasterNode, true), + receive + {nodedown, MasterNode}-> + init:stop() + end. + +% check if node is listed in the nodes() +is_connected(ENode) -> + [N||N<-nodes(), N==ENode] == [ENode]. + +% check if node is alive (ping and disconnect if pingable) +is_started(ENode) -> + case is_connected(ENode) of + true-> + {true, connected}; + false-> + case net_adm:ping(ENode) of + pang-> + false; + pong-> + erlang:disconnect_node(ENode), + {true, not_connected} + end + end. + +% make a Erlang node name from name and hostname +enodename(Host, Node) -> + case lists:member($@, atom_to_list(Node)) of + true -> + Node; + false -> + list_to_atom(atom_to_list(Node)++"@"++atom_to_list(Host)) + end. + +% performs actual start of the "slave" node +do_start(Host, Node, Options) -> + ENode = enodename(Host, Node), + Functions = + lists:append([[{ct_slave, slave_started, [ENode, self()]}], + Options#options.startup_functions, + [{ct_slave, slave_ready, [ENode, self()]}]]), + Functions2 = if + Options#options.monitor_master-> + [{ct_slave, monitor_master, [node()]}|Functions]; + true-> + Functions + end, + MasterHost = gethostname(), + _ = if + MasterHost == Host -> + spawn_local_node(Node, Options); + true-> + spawn_remote_node(Host, Node, Options) + end, + + BootTimeout = Options#options.boot_timeout, + InitTimeout = Options#options.init_timeout, + StartupTimeout = Options#options.startup_timeout, + Result = case wait_for_node_alive(ENode, BootTimeout) of + pong-> + case test_server:is_cover() of + true -> + MainCoverNode = cover:get_main_node(), + rpc:call(MainCoverNode,cover,start,[ENode]); + false -> + ok + end, + call_functions(ENode, Functions2), + receive + {node_started, ENode}-> + receive + {node_ready, ENode}-> + {ok, ENode} + after StartupTimeout*1000-> + {error, startup_timeout, ENode} + end + after InitTimeout*1000 -> + {error, init_timeout, ENode} + end; + pang-> + {error, boot_timeout, ENode} + end, + _ = case Result of + {ok, ENode}-> + ok; + {error, Timeout, ENode} + when ((Timeout==init_timeout) or (Timeout==startup_timeout)) and + Options#options.kill_if_fail-> + do_stop(ENode); + _-> ok + end, + Result. + +% are we using fully qualified hostnames +long_or_short() -> + case net_kernel:longnames() of + true-> + " -name "; + false-> + " -sname " + end. + +% get the localhost's name, depending on the using name policy +gethostname() -> + Hostname = case net_kernel:longnames() of + true-> + net_adm:localhost(); + _-> + {ok, Name}=inet:gethostname(), + Name + end, + list_to_atom(Hostname). + +% get cmd for starting Erlang +get_cmd(Node, Flags) -> + Cookie = erlang:get_cookie(), + "erl -setcookie "++ atom_to_list(Cookie) ++ + long_or_short() ++ atom_to_list(Node) ++ " " ++ Flags. + +% spawn node locally +spawn_local_node(Node, Options) -> + #options{env=Env,erl_flags=ErlFlags} = Options, + Cmd = get_cmd(Node, ErlFlags), + PortOpds = + [stream, {env, Env}] ++ + case Options#options.working_dir of + false -> []; + Dir -> [{cd, Dir}] + end, + spawn(fun() -> + Port = open_port({spawn, Cmd}, PortOpds), + Cwd = case Options#options.working_dir of + false -> + {ok, D} = file:get_cwd(), + D; + D -> D + end, + log_output(Node, Port, Cwd ++ "/stdio.log") + end). + +log_output(Node, Port, File) -> + receive + {Port, {data, Data}} -> + ct:log("Node ~p output:~n~s", [Node, Data]), + {ok, F} = file:open(File, [append]), + file:write(F, Data), + file:close(F), + log_output(Node, Port, File) + end. + + +% spawn node remotely +spawn_remote_node(Host, Node, Options) -> + #options{username=Username, + password=Password, + erl_flags=ErlFlags, + env=Env, + ssh_port=MaybeSSHPort, + ssh_opts=SSHOpts} = Options, + SSHPort = case MaybeSSHPort of + [] -> 22; % Use default SSH port + A -> A + end, + SSHOptions = case {Username, Password} of + {[], []}-> + []; + {_, []}-> + [{user, Username}]; + {_, _}-> + [{user, Username}, {password, Password}] + end ++ [{silently_accept_hosts, true}] ++ SSHOpts, + {ok, _} = application:ensure_all_started(ssh), + {ok, SSHConnRef} = ssh:connect(atom_to_list(Host), SSHPort, SSHOptions), + {ok, SSHChannelId} = ssh_connection:session_channel(SSHConnRef, infinity), + ssh_setenv(SSHConnRef, SSHChannelId, Env), + ssh_connection:exec(SSHConnRef, SSHChannelId, get_cmd(Node, ErlFlags), infinity). + +ssh_setenv(SSHConnRef, SSHChannelId, [{Var, Value} | Vars]) + when is_list(Var), is_list(Value) -> + success = ssh_connection:setenv(SSHConnRef, SSHChannelId, + Var, Value, infinity), + ssh_setenv(SSHConnRef, SSHChannelId, Vars); +ssh_setenv(_SSHConnRef, _SSHChannelId, []) -> ok. + +% call functions on a remote Erlang node +call_functions(_Node, []) -> + ok; +call_functions(Node, [{M, F, A}|Functions]) -> + rpc:call(Node, M, F, A), + call_functions(Node, Functions). + +% wait N seconds until node is pingable +wait_for_node_alive(_Node, 0) -> + pang; +wait_for_node_alive(Node, N) -> + timer:sleep(1000), + case net_adm:ping(Node) of + pong-> + pong; + pang-> + wait_for_node_alive(Node, N-1) + end. + +% call init:stop on a remote node +do_stop(ENode) -> + do_stop(ENode, fetch_options([])). +do_stop(ENode, Options) -> + {Cover,MainCoverNode} = + case test_server:is_cover() of + true -> + Main = cover:get_main_node(), + rpc:call(Main,cover,flush,[ENode]), + {true,Main}; + false -> + {false,undefined} + end, + spawn(ENode, init, stop, []), + StopTimeout = Options#options.stop_timeout, + case wait_for_node_dead(ENode, StopTimeout) of + {ok,ENode} -> + if Cover -> + %% To avoid that cover is started again if a node + %% with the same name is started later. + rpc:call(MainCoverNode,cover,stop,[ENode]); + true -> + ok + end, + {ok,ENode}; + Error -> + Error + end. + +% wait N seconds until node is disconnected +wait_for_node_dead(Node, 0) -> + {error, stop_timeout, Node}; +wait_for_node_dead(Node, N) -> + timer:sleep(1000), + case lists:member(Node, nodes()) of + true-> + wait_for_node_dead(Node, N-1); + false-> + {ok, Node} + end. diff --git a/test/utils/test_utils.erl b/test/utils/test_utils.erl index f9ac2e2ce..44aea4136 100644 --- a/test/utils/test_utils.erl +++ b/test/utils/test_utils.erl @@ -111,17 +111,22 @@ start_node(Name, Config) -> CodePath = lists:filter(fun filelib:is_dir/1, code:get_path()), ct:log("Starting node ~p", [Name]), + {ok, Cwd} = file:get_cwd(), + AntidoteFolder = filename:dirname(filename:dirname(Cwd)), + PrivDir = proplists:get_value(priv_dir, Config), + NodeDir = filename:join([PrivDir, Name]) ++ "/", + filelib:ensure_dir(NodeDir), + %% have the slave nodes monitor the runner node, so they can't outlive it NodeConfig = [ {monitor_master, true}, {erl_flags, "-smp"}, %% smp for the eleveldb god + {cd, NodeDir}, {startup_functions, [ {code, set_path, [CodePath]} ]}], - case ct_slave:start(Name, NodeConfig) of + case ct_slave_ext:start(Name, NodeConfig) of {ok, Node} -> - PrivDir = proplists:get_value(priv_dir, Config), - NodeDir = filename:join([PrivDir, Node]), ct:log("Starting riak_core"), ok = rpc:call(Node, application, load, [riak_core]), @@ -140,7 +145,7 @@ start_node(Name, Config) -> ok = rpc:call(Node, application, set_env, [riak_core, platform_data_dir, PlatformDir]), ok = rpc:call(Node, application, set_env, [riak_core, handoff_port, web_ports(Name) + 3]), - ok = rpc:call(Node, application, set_env, [riak_core, schema_dirs, ["../../_build/default/rel/antidote/lib/"]]), + ok = rpc:call(Node, application, set_env, [riak_core, schema_dirs, [AntidoteFolder ++ "/_build/default/rel/antidote/lib/"]]), ok = rpc:call(Node, application, set_env, [ranch, pb_port, web_ports(Name) + 2]), @@ -159,7 +164,7 @@ start_node(Name, Config) -> Node; {error, Reason, Node} -> ct:pal("Error starting node ~w, reason ~w, will retry", [Node, Reason]), - ct_slave:stop(Name), + ct_slave_ext:stop(Name), time_utils:wait_until_offline(Node), start_node(Name, Config) end. @@ -175,7 +180,7 @@ kill_and_restart_nodes(NodeList, Config) -> %% @doc Kills all given nodes, crashes if one node cannot be stopped -spec kill_nodes([node()]) -> [node()]. kill_nodes(NodeList) -> - lists:map(fun(Node) -> {ok, Name} = ct_slave:stop(get_node_name(Node)), Name end, NodeList). + lists:map(fun(Node) -> {ok, Name} = ct_slave_ext:stop(get_node_name(Node)), Name end, NodeList). %% @doc Send force kill signals to all given nodes @@ -188,7 +193,7 @@ brutal_kill_nodes(NodeList) -> %% kill -9 after X seconds just in case %% rpc:cast(Node, timer, apply_after, %% [?FORCE_KILL_TIMER, os, cmd, [io_lib:format("kill -9 ~s", [OSPidToKill])]]), - ct_slave:stop(get_node_name(Node)), + ct_slave_ext:stop(get_node_name(Node)), rpc:cast(Node, os, cmd, [io_lib:format("kill -15 ~s", [OSPidToKill])]), Node end, NodeList). @@ -299,7 +304,11 @@ descriptors(Clusters) -> web_ports(dev1) -> 10015; web_ports(dev2) -> 10025; web_ports(dev3) -> 10035; -web_ports(dev4) -> 10045. +web_ports(dev4) -> 10045; +web_ports(clusterdev1) -> 10115; +web_ports(clusterdev2) -> 10125; +web_ports(clusterdev3) -> 10135; +web_ports(clusterdev4) -> 10145. %% Build clusters join_cluster(Nodes) ->