Skip to content

Commit

Permalink
the byte to short refactor for nodeIdentifider
Browse files Browse the repository at this point in the history
  • Loading branch information
simbo1905 committed Dec 31, 2024
1 parent f2853a6 commit 5435473
Show file tree
Hide file tree
Showing 33 changed files with 152 additions and 144 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,12 @@ This is achieved by encoding the node identifier in each `N`s lower bits.
This library uses a record with a signature similar to this:

```java
public record BallotNumber(int counter, byte nodeIdentifier) implements Comparable<BallotNumber> {
public record BallotNumber(int counter, short nodeIdentifier) implements Comparable<BallotNumber> {
}
```

The `compareTo` method treats the four-byte counter as having the most significant bits and the
single-byte `nodeIndentifier` as having the least significant bits. The cluster operator must ensure they assign unique
two-byte `nodeIndentifier` as having the least significant bits. The cluster operator must ensure they assign unique
`nodeIdentifier` values to every node added to the cluster.

In this implementation, nodes never recycle their numbers. They increment their counter each time they attempt to lead.
Expand Down Expand Up @@ -150,7 +150,7 @@ public record Command(String uuid,
byte[] operationBytes) {
}

public record BallotNumber(int counter, byte nodeIdentifier) {
public record BallotNumber(int counter, short nodeIdentifier) {
}

public record Accept(long logIndex,
Expand Down Expand Up @@ -404,7 +404,7 @@ The list of tasks:
- [x] Write a test harness that injects rolling network partitions.
- [x] Write property based tests to exhaustively verify correctness.
- [x] Write extensive documentation including detailed JavaDoc.
- [ ] Include a record based distributed CAS as a default embedded pattern.
- [ ] Write a transport for a demo. As Kwik does not support connection failover will start with my own UDP.
- [ ] Implement distributed advisor lock service as a full demo.
- [ ] Implement cluster membership changes as UPaxos.
- [ ] Add optionality so that randomized timeouts can be replaced by some other leader failure detection (e.g. JGroups).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@
/// nodeIdentifier in the least significant fifth byte. This works as long as we make the nodeIdentifier unique within the cluster
/// at any given configuration. It must also be unique across the overlaps of cluster membership reconfigurations. We can use Paxos itself to
/// ensure this uniqueness.
public record BallotNumber(int counter, byte nodeIdentifier) implements Comparable<BallotNumber> {
public record BallotNumber(int counter, Short nodeIdentifier) implements Comparable<BallotNumber> {

public static final BallotNumber MIN = new BallotNumber(Integer.MIN_VALUE, Byte.MIN_VALUE);
public static final BallotNumber MIN = new BallotNumber(Integer.MIN_VALUE, Short.MIN_VALUE);

@Override
public int compareTo(BallotNumber that) {
if (this.counter == that.counter) {
return Byte.compare(this.nodeIdentifier, that.nodeIdentifier);
return Short.compare(this.nodeIdentifier, that.nodeIdentifier);
}
return Integer.compare(this.counter, that.counter);
}
Expand Down
2 changes: 1 addition & 1 deletion trex-lib/src/main/java/com/github/trex_paxos/Journal.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public interface Journal {
///
/// @param nodeIdentifier The node identifier to load the progress record for. To avoid accidentally loading the wrong
/// history when moving nodes between servers we require the node identifier. This is only a safety feature.
Progress readProgress(byte nodeIdentifier);
Progress readProgress(short nodeIdentifier);

/// Save a value into the log.
/// Logically this method is storing `accept(S,N,V)` so it needs to store the values `{N,V}` at log slot `S`
Expand Down
2 changes: 1 addition & 1 deletion trex-lib/src/main/java/com/github/trex_paxos/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@

public interface Message {
/// @return the node in the cluster that sent this message.
byte from();
short from();
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public PaxosService(TrexEngine engine,
}

/// Shorthand to get the node id which must be unique in the paxos cluster. It is th responsibility of the cluster owner to ensure that it is unique.
public byte nodeId() {
public short nodeId() {
return engine.trexNode.nodeIdentifier();
}

Expand Down
12 changes: 6 additions & 6 deletions trex-lib/src/main/java/com/github/trex_paxos/Pickle.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public static byte[] writeProgress(Progress progress) throws IOException {
}

public static void write(Progress progress, DataOutputStream dos) throws IOException {
dos.writeByte(progress.nodeIdentifier());
dos.writeShort(progress.nodeIdentifier());
write(progress.highestPromised(), dos);
dos.writeLong(progress.highestFixedIndex());
}
Expand All @@ -51,7 +51,7 @@ public static Progress readProgress(byte[] pickled) throws IOException {
}

private static Progress readProgress(DataInputStream dis) throws IOException {
return new Progress(dis.readByte(), readBallotNumber(dis), dis.readLong());
return new Progress(dis.readShort(), readBallotNumber(dis), dis.readLong());
}

public static byte[] write(BallotNumber n) throws IOException {
Expand All @@ -64,7 +64,7 @@ public static byte[] write(BallotNumber n) throws IOException {

public static void write(BallotNumber n, DataOutputStream dataOutputStream) throws IOException {
dataOutputStream.writeInt(n.counter());
dataOutputStream.writeByte(n.nodeIdentifier());
dataOutputStream.writeShort(n.nodeIdentifier());
}

public static BallotNumber readBallotNumber(byte[] pickled) throws IOException {
Expand All @@ -75,18 +75,18 @@ public static BallotNumber readBallotNumber(byte[] pickled) throws IOException {
}

public static BallotNumber readBallotNumber(DataInputStream dataInputStream) throws IOException {
return new BallotNumber(dataInputStream.readInt(), dataInputStream.readByte());
return new BallotNumber(dataInputStream.readInt(), dataInputStream.readShort());
}

public static void write(Accept m, DataOutputStream dataStream) throws IOException {
dataStream.writeByte(m.from());
dataStream.writeShort(m.from());
dataStream.writeLong(m.slot());
write(m.number(), dataStream);
write(m.command(), dataStream);
}

public static Accept readAccept(DataInputStream dataInputStream) throws IOException {
final byte from = dataInputStream.readByte();
final short from = dataInputStream.readShort();
final long logIndex = dataInputStream.readLong();
final BallotNumber number = readBallotNumber(dataInputStream);
final var command = readCommand(dataInputStream);
Expand Down
4 changes: 2 additions & 2 deletions trex-lib/src/main/java/com/github/trex_paxos/Progress.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
* @param highestFixedIndex The highest log index that has been learnt to have been fixed and so not fixed.
*/
public record Progress(
byte nodeIdentifier,
short nodeIdentifier,
BallotNumber highestPromised,
long highestFixedIndex
) {
Expand All @@ -34,7 +34,7 @@ public record Progress(
*
* @param nodeIdentifier The current node identifier.
*/
public Progress(byte nodeIdentifier) {
public Progress(short nodeIdentifier) {
this(nodeIdentifier, BallotNumber.MIN, 0);
}

Expand Down
19 changes: 13 additions & 6 deletions trex-lib/src/main/java/com/github/trex_paxos/TrexNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public boolean isCrashed() {
/// @param nodeIdentifier The unique node identifier. This must be unique across the cluster and across enough time for prior messages to have been forgotten.
/// @param quorumStrategy The quorum strategy that may be a simple majority, else things like FPaxos or UPaxos
/// @param journal The durable storage and durable log. This must be pre-initialised.
public TrexNode(Level logAtLevel, byte nodeIdentifier, QuorumStrategy quorumStrategy, Journal journal) {
public TrexNode(Level logAtLevel, short nodeIdentifier, QuorumStrategy quorumStrategy, Journal journal) {
this.nodeIdentifier = nodeIdentifier;
this.journal = journal;
this.quorumStrategy = quorumStrategy;
Expand All @@ -92,7 +92,7 @@ public TrexNode(Level logAtLevel, byte nodeIdentifier, QuorumStrategy quorumStra
}

/// The current node identifier. This must be globally unique in the cluster.
final byte nodeIdentifier;
final short nodeIdentifier;

/// The durable storage and durable log.
final Journal journal;
Expand All @@ -108,7 +108,7 @@ public TrexNode(Level logAtLevel, byte nodeIdentifier, QuorumStrategy quorumStra
Progress progress;

/// During a recovery we will track all the slots that we are probing to find the highest accepted operationBytes.
final NavigableMap<Long, Map<Byte, PrepareResponse>> prepareResponsesByLogIndex = new TreeMap<>();
final NavigableMap<Long, Map<Short, PrepareResponse>> prepareResponsesByLogIndex = new TreeMap<>();

/// When leading we will track the responses to a stream of accept messages.
final NavigableMap<Long, AcceptVotes> acceptVotesByLogIndex = new TreeMap<>();
Expand Down Expand Up @@ -195,6 +195,7 @@ private void algorithm(TrexMessage input,
return;
}
switch (input) {

case Accept accept -> {
final var number = accept.slotTerm().number();
final var logIndex = accept.slotTerm().logIndex();
Expand Down Expand Up @@ -368,6 +369,12 @@ case CatchupResponse(_, _, final var catchup) -> {
journal.writeProgress(progress);
}
}
default -> {
// this is unreachable but on refactor to SHORT the compiler complaines FIXME
LOGGER.severe("Unknown message type: " + input);
throw new IllegalArgumentException("Unknown message type: " + input);
}

}
}

Expand Down Expand Up @@ -612,7 +619,7 @@ public long highestFixed() {
return progress.highestFixedIndex();
}

public byte nodeIdentifier() {
public short nodeIdentifier() {
return nodeIdentifier;
}

Expand Down Expand Up @@ -703,7 +710,7 @@ protected void setRole(TrexRole role) {
}

private void processPrepareResponse(PrepareResponse prepareResponse, List<TrexMessage> messages) {
final byte from = prepareResponse.from();
final var from = prepareResponse.from();
final long logIndex = prepareResponse.vote().logIndex();
final var votes = prepareResponsesByLogIndex.computeIfAbsent(logIndex, _ -> new HashMap<>());
votes.put(from, prepareResponse);
Expand Down Expand Up @@ -794,7 +801,7 @@ public Progress progress() {
/**
* A record of the votes received by a node from other cluster members.
*/
public record AcceptVotes(SlotTerm accept, Map<Byte, AcceptResponse> responses, boolean chosen) {
public record AcceptVotes(SlotTerm accept, Map<Short, AcceptResponse> responses, boolean chosen) {
public AcceptVotes(SlotTerm slotTerm) {
this(slotTerm, new HashMap<>(), false);
}
Expand Down
2 changes: 1 addition & 1 deletion trex-lib/src/main/java/com/github/trex_paxos/Value.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import java.util.UUID;

public record Value(byte from, UUID uuid, byte[] bytes) implements Message {
public record Value(short from, UUID uuid, byte[] bytes) implements Message {
public Value {
if (uuid == null) {
throw new IllegalArgumentException("uuid cannot be null");
Expand Down
4 changes: 2 additions & 2 deletions trex-lib/src/main/java/com/github/trex_paxos/msg/Accept.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
/// @param from see {@link TrexMessage}
/// @param slotTerm This is the `{S,N}` that identifies the fixed `V`.
/// @param command The command to be accepted by the acceptor. This may be a NOOP or a client command.
public record Accept(byte from,
public record Accept(short from,
SlotTerm slotTerm,
AbstractCommand command) implements TrexMessage, BroadcastMessage, PaxosMessage {

public Accept(byte from, long logIndex, BallotNumber number, AbstractCommand command) {
public Accept(short from, long logIndex, BallotNumber number, AbstractCommand command) {
this(from, new SlotTerm(logIndex, number), command);

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,16 @@
/// @param to see {@link DirectMessage}
/// @param vote whether wre have voted for or voted against the Prepare message based on our past promises.
/// @param highestFixedIndex additional information about the highest accepted index so that a leader will abdicate if it is behind.
public record AcceptResponse(byte from,
byte to,
public record AcceptResponse(short from,
short to,
Vote vote,
long highestFixedIndex
) implements TrexMessage, DirectMessage, LearningMessage {
public record Vote(
// spookily intellij says there are no usages of this field, but if I remove it everything breaks
byte from,
short from,
// spookily intellij says there are no usages of this field, but if I remove it everything breaks
byte to,
short to,
long logIndex,
boolean vote
) {
Expand Down
9 changes: 4 additions & 5 deletions trex-lib/src/main/java/com/github/trex_paxos/msg/Catchup.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,10 @@
/// @param to see {@link DirectMessage}
/// @param highestFixedIndex the highest index that the replica has fixed.
/// @param highestPromised the highest ballot number that the replica has promised.
public record Catchup(byte from,
byte to,
long highestFixedIndex,
BallotNumber highestPromised
) implements DirectMessage, TrexMessage {
public record Catchup(short from,
short to,
long highestFixedIndex,
BallotNumber highestPromised) implements DirectMessage, TrexMessage {

@Override
public String toString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
/// @param from see {@link TrexMessage}
/// @param to see {@link DirectMessage}
/// @param accepts the list of fixed accepts above the slot index requested.
public record CatchupResponse(byte from,
byte to,
public record CatchupResponse(short from,
short to,
List<Accept> accepts
) implements TrexMessage, DirectMessage, LearningMessage {
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@
public sealed interface DirectMessage extends TrexMessage
permits AcceptResponse, Catchup, CatchupResponse, PrepareResponse {
/// @return the node in the cluster that this message is intended for.
byte to();
short to();
}
4 changes: 2 additions & 2 deletions trex-lib/src/main/java/com/github/trex_paxos/msg/Fixed.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
/// @param from see {@link TrexMessage}
/// @param slotTerm This is the `{S,N}` that identifies the fixed `V`. `
public record Fixed(
byte from,
short from,
SlotTerm slotTerm
) implements TrexMessage, BroadcastMessage, LearningMessage {
public Fixed(byte from, long logIndex, BallotNumber number) {
public Fixed(short from, long logIndex, BallotNumber number) {
this(from, new SlotTerm(logIndex, number));
}

Expand Down
4 changes: 2 additions & 2 deletions trex-lib/src/main/java/com/github/trex_paxos/msg/Prepare.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
/// @param from The node identifier of the proposer used to route the message and self-accept.
/// @param slotTerm This is the `{S,N}` where a successful leader will select the highest `V`.
public record Prepare(
byte from,
short from,
SlotTerm slotTerm
) implements TrexMessage, BroadcastMessage, PaxosMessage {
public Prepare(byte from, long logIndex, BallotNumber number) {
public Prepare(short from, long logIndex, BallotNumber number) {
this(from, new SlotTerm(logIndex, number));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,17 @@
/// @param journaledAccept the highest unfixed log entry if any.
/// @param highestAcceptedIndex additional information about the highest accepted index so that a leader can learn of more slots that it needs to recover.
public record PrepareResponse(
byte from,
byte to,
short from,
short to,
Vote vote,
Optional<Accept> journaledAccept,
long highestAcceptedIndex
) implements TrexMessage, DirectMessage {
public record Vote(
// spookily intellij says there are no usages of this field, but if I remove it everything breaks
byte from,
short from,
// spookily intellij says there are no usages of this field, but if I remove it everything breaks
byte to,
short to,
long logIndex,
boolean vote,
BallotNumber number
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,6 @@ public sealed interface TrexMessage extends Message permits
Prepare,
PrepareResponse {
/// @return the node in the cluster that sent this message.
byte from();
short from();
}

Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ record TestCase(
@Property(generation = GenerationMode.EXHAUSTIVE)
void acceptTests(@ForAll("testCases") TestCase testCase) {
// Set up the identifier of the node under test
final var thisNodeId = (byte) 2;
final var thisNodeId = (short) 2;

// Set up the identifier of the other node relative to the node under test
final var otherNodeId = switch (testCase.nodeIdentifierRelation) {
case LESS -> (byte) (thisNodeId - 1);
case LESS -> (short) (thisNodeId - 1);
case EQUAL -> thisNodeId;
case GREATER -> (byte) (thisNodeId + 1);
case GREATER -> (short) (thisNodeId + 1);
};

// Setup ballot number of the node under test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ record TestCase(

@Property(generation = GenerationMode.EXHAUSTIVE)
void acceptResponseTests(@ForAll("testCases") TestCase testCase) {
final var thisNodeId = (byte) 2;
final var thisNodeId = (short) 2;

final var otherNodeId = switch (testCase.nodeIdentifierRelation) {
case LESS -> (byte) (thisNodeId - 1);
case LESS -> (short) (thisNodeId - 1);
case EQUAL -> thisNodeId;
case GREATER -> (byte) (thisNodeId + 1);
case GREATER -> (short) (thisNodeId + 1);
};

final var thisCounter = 100;
Expand Down Expand Up @@ -103,7 +103,7 @@ record CreatedData(Accept accept, AcceptVotes votes) {
/// Set up an `accept` and `acceptVotes` for a given slot
private CreatedData createAcceptVotes(long s) {
final var a = new Accept(thisNodeId, s, thisPromise, NoOperation.NOOP);
final Map<Byte, AcceptResponse> responses = new TreeMap<>();
final Map<Short, AcceptResponse> responses = new TreeMap<>();
responses.put(thisNodeId, new AcceptResponse(thisNodeId, thisNodeId,
new AcceptResponse.Vote(thisNodeId, thisNodeId, s, thisVote), s));
AcceptVotes votes = new AcceptVotes(a.slotTerm(), responses, false);
Expand Down
Loading

0 comments on commit 5435473

Please sign in to comment.