Skip to content

Commit 08a0c8a

Browse files
committed
Gossip WIP
1 parent 7f18be2 commit 08a0c8a

File tree

5 files changed

+108
-20
lines changed

5 files changed

+108
-20
lines changed

cluster2/src/main/java/io/scalecube/cluster2/MemberCodec.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ private MutableDirectBuffer encode(Consumer<MemberEncoder> consumer) {
4646

4747
public Member member(Consumer<UnsafeBuffer> consumer) {
4848
consumer.accept(unsafeBuffer);
49+
4950
memberDecoder.wrapAndApplyHeader(unsafeBuffer, 0, headerDecoder);
5051

5152
final UUID id = uuid(memberDecoder.id());
Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,55 @@
11
package io.scalecube.cluster2.gossip;
22

3+
import static io.scalecube.cluster2.UUIDCodec.uuid;
4+
35
import io.scalecube.cluster2.AbstractCodec;
6+
import io.scalecube.cluster2.UUIDCodec;
7+
import io.scalecube.cluster2.sbe.GossipDecoder;
8+
import io.scalecube.cluster2.sbe.GossipEncoder;
9+
import java.util.UUID;
410
import java.util.function.Consumer;
11+
import org.agrona.MutableDirectBuffer;
512
import org.agrona.concurrent.UnsafeBuffer;
613

714
public class GossipCodec extends AbstractCodec {
815

16+
private final GossipEncoder gossipEncoder = new GossipEncoder();
17+
private final GossipDecoder gossipDecoder = new GossipDecoder();
18+
919
public GossipCodec() {}
1020

21+
// Encode
22+
23+
public MutableDirectBuffer encode(Gossip gossip) {
24+
encodedLength = 0;
25+
26+
gossipEncoder.wrapAndApplyHeader(encodedBuffer, 0, headerEncoder);
27+
UUIDCodec.encode(gossip.gossiperId(), gossipEncoder.gossiperId());
28+
gossipEncoder.sequenceId(gossip.sequenceId());
29+
gossipEncoder.putMessage(gossip.message(), 0, gossip.message().length);
30+
31+
encodedLength = headerEncoder.encodedLength() + gossipEncoder.encodedLength();
32+
return encodedBuffer;
33+
}
34+
1135
// Decode
1236

1337
public Gossip gossip(Consumer<UnsafeBuffer> consumer) {
1438
consumer.accept(unsafeBuffer);
1539

16-
// TODO
40+
gossipDecoder.wrapAndApplyHeader(unsafeBuffer, 0, headerDecoder);
41+
42+
final UUID gossiperId = uuid(gossipDecoder.gossiperId());
43+
if (gossiperId == null) {
44+
return null;
45+
}
46+
47+
final long sequenceId = gossipDecoder.sequenceId();
48+
49+
final int messageLength = gossipDecoder.messageLength();
50+
final byte[] message = new byte[messageLength];
51+
gossipDecoder.getMessage(message, 0, messageLength);
1752

18-
return new Gossip(null, null, 1);
53+
return new Gossip(gossiperId, sequenceId, message);
1954
}
2055
}

cluster2/src/main/java/io/scalecube/cluster2/gossip/GossipProtocol.java

Lines changed: 43 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import io.scalecube.cluster2.MemberCodec;
1010
import io.scalecube.cluster2.sbe.GossipMessageDecoder;
1111
import io.scalecube.cluster2.sbe.GossipRequestDecoder;
12-
import io.scalecube.cluster2.sbe.GossipRequestDecoder.GossipsDecoder;
1312
import io.scalecube.cluster2.sbe.MembershipEventDecoder;
1413
import io.scalecube.cluster2.sbe.MembershipEventType;
1514
import io.scalecube.cluster2.sbe.MessageHeaderDecoder;
@@ -36,7 +35,8 @@ public class GossipProtocol extends AbstractAgent {
3635
private final GossipMessageDecoder gossipMessageDecoder = new GossipMessageDecoder();
3736
private final GossipRequestDecoder gossipRequestDecoder = new GossipRequestDecoder();
3837
private final MembershipEventDecoder membershipEventDecoder = new MembershipEventDecoder();
39-
private final GossipCodec codec = new GossipCodec();
38+
private final GossipRequestCodec gossipRequestCodec = new GossipRequestCodec();
39+
private final GossipCodec gossipCodec = new GossipCodec();
4040
private final MemberCodec memberCodec = new MemberCodec();
4141
private final UnsafeBuffer unsafeBuffer = new UnsafeBuffer();
4242
private final String roleName;
@@ -46,6 +46,7 @@ public class GossipProtocol extends AbstractAgent {
4646
private final Map<String, GossipState> gossips = new Object2ObjectHashMap<>();
4747
private final List<Member> remoteMembers = new ArrayList<>();
4848
private final List<Member> gossipMembers = new ArrayList<>();
49+
private final List<Gossip> gossipsToSend = new ArrayList<>();
4950
private final List<String> gossipsToRemove = new ArrayList<>();
5051

5152
public GossipProtocol(
@@ -86,7 +87,7 @@ protected void onTick() {
8687
nextGossipMembers();
8788

8889
for (int i = 0, n = gossipMembers.size(); i < n; i++) {
89-
spreadGossipsTo(period, gossipMembers.get(i));
90+
spreadGossips(period, gossipMembers.get(i));
9091
}
9192

9293
// Sweep gossips
@@ -142,6 +143,33 @@ private void checkGossipSegmentation() {
142143
}
143144
}
144145

146+
private void spreadGossips(long period, Member member) {
147+
nextGossipsToSend(period, member);
148+
149+
final String address = member.address();
150+
final UUID from = localMember.id();
151+
152+
for (int i = 0, n = gossipsToSend.size(); i < n; i++) {
153+
final Gossip gossip = gossipsToSend.get(i);
154+
transport.send(
155+
address, gossipRequestCodec.encode(from, gossip), 0, gossipRequestCodec.encodedLength());
156+
}
157+
}
158+
159+
private void nextGossipsToSend(long period, Member member) {
160+
gossipsToSend.clear();
161+
162+
final int periodsToSpread =
163+
ClusterMath.gossipPeriodsToSpread(config.gossipRepeatMult(), remoteMembers.size() + 1);
164+
165+
for (final GossipState gossipState : gossips.values()) {
166+
if (gossipState.infectionPeriod() + periodsToSpread >= period
167+
&& !gossipState.isInfected(member.id())) {
168+
gossipsToSend.add(gossipState.gossip());
169+
}
170+
}
171+
}
172+
145173
@Override
146174
public void onMessage(int msgTypeId, MutableDirectBuffer buffer, int index, int length) {
147175
headerDecoder.wrap(buffer, index);
@@ -180,20 +208,20 @@ private void onGossipRequest(GossipRequestDecoder decoder) {
180208
final long period = currentPeriod;
181209
final UUID from = uuid(decoder.from());
182210

183-
for (GossipsDecoder gossipsDecoder : decoder.gossips()) {
184-
final Gossip gossip = codec.gossip(gossipsDecoder::wrapGossip);
185-
GossipState gossipState = gossips.get(gossip.gossipId());
186-
if (ensureSequence(gossip.gossiperId()).add(gossip.sequenceId())) {
187-
if (gossipState == null) { // new gossip
188-
gossipState = new GossipState(gossip, period);
189-
gossips.put(gossip.gossipId(), gossipState);
190-
emitGossipMessage(gossip.message());
191-
}
192-
}
193-
if (gossipState != null) {
194-
gossipState.addToInfected(from);
211+
final Gossip gossip = gossipCodec.gossip(decoder::wrapGossip);
212+
GossipState gossipState = gossips.get(gossip.gossipId());
213+
214+
if (ensureSequence(gossip.gossiperId()).add(gossip.sequenceId())) {
215+
if (gossipState == null) { // new gossip
216+
gossipState = new GossipState(gossip, period);
217+
gossips.put(gossip.gossipId(), gossipState);
218+
emitGossipMessage(gossip.message());
195219
}
196220
}
221+
222+
if (gossipState != null) {
223+
gossipState.addToInfected(from);
224+
}
197225
}
198226

199227
private void emitGossipMessage(byte[] message) {
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package io.scalecube.cluster2.gossip;
2+
3+
import io.scalecube.cluster2.AbstractCodec;
4+
import io.scalecube.cluster2.UUIDCodec;
5+
import io.scalecube.cluster2.sbe.GossipRequestEncoder;
6+
import java.util.UUID;
7+
import org.agrona.MutableDirectBuffer;
8+
9+
public class GossipRequestCodec extends AbstractCodec {
10+
11+
private final GossipRequestEncoder gossipRequestEncoder = new GossipRequestEncoder();
12+
private final GossipCodec gossipCodec = new GossipCodec();
13+
14+
public GossipRequestCodec() {}
15+
16+
public MutableDirectBuffer encode(UUID from, Gossip gossip) {
17+
encodedLength = 0;
18+
19+
gossipRequestEncoder.wrapAndApplyHeader(encodedBuffer, 0, headerEncoder);
20+
UUIDCodec.encode(from, gossipRequestEncoder.from());
21+
gossipRequestEncoder.putGossip(gossipCodec.encode(gossip), 0, gossipCodec.encodedLength());
22+
23+
encodedLength = headerEncoder.encodedLength() + gossipRequestEncoder.encodedLength();
24+
return encodedBuffer;
25+
}
26+
}

cluster2/src/main/resources/scalecube-cluster-codecs.xml

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -103,9 +103,7 @@
103103

104104
<sbe:message name="GossipRequest" id="202">
105105
<field name="from" id="1" type="UUID"/>
106-
<group name="gossips" id="100">
107-
<data name="gossip" id="1" type="VarData"/>
108-
</group>
106+
<data name="gossip" id="100" type="VarData"/>
109107
</sbe:message>
110108

111109
<!-- Cluster Membership -->

0 commit comments

Comments
 (0)