diff --git a/src/wa_raft_log_catchup.erl b/src/wa_raft_log_catchup.erl index 9e831b8..5ba05e7 100644 --- a/src/wa_raft_log_catchup.erl +++ b/src/wa_raft_log_catchup.erl @@ -53,6 +53,7 @@ table :: wa_raft:table(), partition :: wa_raft:partition(), self :: #raft_identity{}, + identifier :: #raft_identifier{}, distribution_module :: module(), log :: wa_raft_log:log(), @@ -157,7 +158,8 @@ registered_name(Table, Partition) -> %% RAFT log catchup server implementation -spec init(Options :: #raft_options{}) -> {ok, #state{}, timeout()}. init(#raft_options{application = Application, table = Table, partition = Partition, self = Self, - distribution_module = DistributionModule, log_name = LogName, log_module = LogModule, + identifier = Identifier, distribution_module = DistributionModule, + log_name = LogName, log_module = LogModule, log_catchup_name = Name, server_name = Server}) -> process_flag(trap_exit, true), @@ -178,6 +180,7 @@ init(#raft_options{application = Application, table = Table, partition = Partiti table = Table, partition = Partition, self = Self, + identifier = Identifier, distribution_module = DistributionModule, log = Log, server_name = Server @@ -252,7 +255,8 @@ send_logs(Peer, NextLogIndex, LeaderTerm, LeaderCommitIndex, Witness, #state{nam NewState. -spec send_logs_impl(#raft_identity{}, wa_raft_log:log_index(), wa_raft_log:log_term(), wa_raft_log:log_index(), boolean(), #state{}) -> term(). -send_logs_impl(#raft_identity{node = PeerNode} = Peer, NextLogIndex, LeaderTerm, LeaderCommitIndex, Witness, #state{application = App, name = Name, self = Self, distribution_module = DistributionModule, server_name = Server, log = Log} = State) -> +send_logs_impl(#raft_identity{node = PeerNode} = Peer, NextLogIndex, LeaderTerm, LeaderCommitIndex, Witness, + #state{application = App, name = Name, self = Self, identifier = Identifier, distribution_module = DistributionModule, server_name = Server, log = Log} = State) -> PrevLogIndex = NextLogIndex - 1, {ok, PrevLogTerm} = wa_raft_log:term(Log, PrevLogIndex), @@ -277,7 +281,7 @@ send_logs_impl(#raft_identity{node = PeerNode} = Peer, NextLogIndex, LeaderTerm, Command = wa_raft_server:make_rpc(Self, LeaderTerm, ?APPEND_ENTRIES(PrevLogIndex, PrevLogTerm, Entries, LeaderCommitIndex, 0)), Timeout = ?RAFT_CATCHUP_HEARTBEAT_TIMEOUT(), - try wa_raft_server:parse_rpc(Self, DistributionModule:call(Dest, Command, Timeout)) of + try wa_raft_server:parse_rpc(Self, DistributionModule:call(Dest, Identifier, Command, Timeout)) of {LeaderTerm, _, ?APPEND_ENTRIES_RESPONSE(PrevLogIndex, true, FollowerEndIndex)} -> send_logs_impl(Peer, FollowerEndIndex + 1, LeaderTerm, LeaderCommitIndex, Witness, State); {LeaderTerm, _, ?APPEND_ENTRIES_RESPONSE(PrevLogIndex, false, _FollowerEndIndex)} ->