Skip to content

Commit

Permalink
wip...
Browse files Browse the repository at this point in the history
  • Loading branch information
pruivo committed Jan 16, 2023
1 parent d113a8c commit a662294
Show file tree
Hide file tree
Showing 15 changed files with 550 additions and 47 deletions.
60 changes: 49 additions & 11 deletions src/org/jgroups/gossiprouter/GossipRouterGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import org.jgroups.Address;
import org.jgroups.PhysicalAddress;
import org.jgroups.blocks.cs.BaseServer;
import org.jgroups.gossiprouter.metrics.GossipRouterGroupMetrics;
import org.jgroups.gossiprouter.metrics.GossipRouterMetrics;
import org.jgroups.logging.Log;
import org.jgroups.logging.LogFactory;
import org.jgroups.protocols.PingData;
Expand Down Expand Up @@ -33,11 +35,14 @@ public class GossipRouterGroup {
private final BaseServer server;
private final boolean printRegistrationToStdOut;
private final ConcurrentHashMap<Address, GossipRouterMember> members = new ConcurrentHashMap<>(16);
private final GossipRouterGroupMetrics metrics;

public GossipRouterGroup(String name, BaseServer server, boolean printRegistrationToStdOut) {
public GossipRouterGroup(String name, BaseServer server, boolean printRegistrationToStdOut, GossipRouterMetrics metrics) {
this.name = name;
this.server = server;
this.printRegistrationToStdOut = printRegistrationToStdOut;
this.metrics = metrics.createGroupMetrics(name);
this.metrics.setGroupSize(members::size);
}

private static PingData createPingData(Map.Entry<Address, GossipRouterMember> entry) {
Expand All @@ -47,14 +52,17 @@ private static PingData createPingData(Map.Entry<Address, GossipRouterMember> en


public void registerMember(Address logicalAddress, Address clientAddress, PhysicalAddress physicalAddress, String logicalName) {
GossipRouterMember m = new GossipRouterMember(clientAddress, physicalAddress, logicalName);
metrics.incrementRegisterEvents();
GossipRouterMember m = GossipRouterMember.create(logicalAddress, clientAddress, physicalAddress, logicalName, metrics);
members.put(logicalAddress, m);
logRegistered(m);
}

public boolean unregisterMember(Address logicalAddress) {
metrics.incrementUnregisterEvents();
GossipRouterMember m = members.remove(logicalAddress);
if (m != null) {
m.onUnregister();
logUnregistered(m);
}
return isEmpty();
Expand All @@ -68,6 +76,7 @@ public boolean onDisconnect(Address clientAddress, Consumer<Address> removedMemb
if (!m.getClientAddress().equals(clientAddress)) {
continue;
}
m.onDisconnect();
iterator.remove();
log.debug("connection to %s closed", clientAddress);
logUnregistered(m);
Expand Down Expand Up @@ -108,6 +117,7 @@ public void dumpMappings(StringBuilder sb) {
}

private void sendSuspect(Address suspect) {
metrics.incrementSuspectEvents();
GossipData data = new GossipData(GossipType.SUSPECT, name, suspect);
ByteArrayDataOutputStream out = new ByteArrayDataOutputStream(data.serializedSize());
try {
Expand All @@ -116,39 +126,67 @@ private void sendSuspect(Address suspect) {
log.error("failed marshalling gossip data %s: %s; dropping request", data, ex);
return;
}
sendMulticast(out.buffer(), 0, out.position());
sendMulticast(null, out.buffer(), 0, out.position());
}

public void sendUnicast(Address logicalAddress, byte[] data, int offset, int length) {
GossipRouterMember member = members.get(logicalAddress);
public void sendUnicast(Address src, Address dest, byte[] data, int offset, int length) {
recordUnicastReceived(src, length);
GossipRouterMember member = members.get(dest);
if (member == null) {
log.warn("dest %s in cluster %s not found", logicalAddress, name);
log.warn("dest %s in cluster %s not found", src, name);
return;
}
member.onUnicastMessageSent(length);
sendToMember(member, data, offset, length);
}

public void sendUnicast(Address logicalAddress, ByteBuffer buffer) {
GossipRouterMember member = members.get(logicalAddress);
public void sendUnicast(Address src, Address dst, ByteBuffer buffer) {
int length = buffer.remaining();
recordUnicastReceived(src, length);
GossipRouterMember member = members.get(dst);
if (member == null) {
log.warn("dest %s in cluster %s not found", logicalAddress, name);
log.warn("dest %s in cluster %s not found", dst, name);
return;
}
member.onUnicastMessageSent(length);
sendToMember(member, buffer);
}

public void sendMulticast(byte[] data, int offset, int length) {
public void sendMulticast(Address src, byte[] data, int offset, int length) {
recordUnicastReceived(src, length);
for (GossipRouterMember member : members.values()) {
member.onMulticastMessageSent(length);
sendToMember(member, data, offset, length);
}
}

public void sendMulticast(ByteBuffer buffer) {
public void sendMulticast(Address src, ByteBuffer buffer) {
int length = buffer.remaining();
recordMulticastReceived(src, length);
for (GossipRouterMember member : members.values()) {
member.onMulticastMessageSent(length);
sendToMember(member, buffer.duplicate());
}
}

private void recordUnicastReceived(Address src, int length) {
GossipRouterMember sender = getSender(src);
if (sender != null) {
sender.onUnicastMessageReceived(length);
}
}

private void recordMulticastReceived(Address src, int length) {
GossipRouterMember sender = getSender(src);
if (sender != null) {
sender.onMulticastMessageReceived(length);
}
}

private GossipRouterMember getSender(Address src) {
return src == null ? null : members.get(src);
}

private void logRegistered(GossipRouterMember m) {
if (log.isDebugEnabled())
log.debug("added %s (%s) to group %s", m.getLogicalName(), m.getPhysicalAddress(), name);
Expand Down
35 changes: 34 additions & 1 deletion src/org/jgroups/gossiprouter/GossipRouterMember.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import org.jgroups.Address;
import org.jgroups.PhysicalAddress;
import org.jgroups.gossiprouter.metrics.GossipRouterGroupMetrics;
import org.jgroups.gossiprouter.metrics.GossipRouterMemberMetrics;

/**
* Contains the node details.
Expand All @@ -13,11 +15,18 @@ public class GossipRouterMember {
private final PhysicalAddress physicalAddress;
private final String logicalName;
private final Address clientAddress; // address of the client which registered an item
private final GossipRouterMemberMetrics metrics;

public GossipRouterMember(Address clientAddress, PhysicalAddress physicalAddress, String logicalName) {
private GossipRouterMember(Address clientAddress, PhysicalAddress physicalAddress, String logicalName, GossipRouterMemberMetrics metrics) {
this.physicalAddress = physicalAddress;
this.logicalName = logicalName;
this.clientAddress = clientAddress;
this.metrics = metrics;
}

public static GossipRouterMember create(Address logicalAddress, Address clientAddress, PhysicalAddress physicalAddress, String logicalName, GossipRouterGroupMetrics metricsRegistry) {
GossipRouterMemberMetrics metrics = metricsRegistry.registerMember(logicalAddress, physicalAddress, logicalName);
return new GossipRouterMember(clientAddress, physicalAddress, logicalName, metrics);
}

public PhysicalAddress getPhysicalAddress() {
Expand All @@ -36,6 +45,30 @@ public void dump(StringBuilder sb) {
sb.append(String.format(" %s: (client_address: %s, uuid:%s)\n", logicalName, physicalAddress, clientAddress));
}

public void onUnicastMessageReceived(int bytes) {
metrics.addUnicastMessageReceived(bytes);
}

public void onMulticastMessageReceived(int bytes) {
metrics.addMulticastMessageReceived(bytes);
}

public void onUnicastMessageSent(int bytes) {
metrics.addUnicastMessageSent(bytes);
}

public void onMulticastMessageSent(int bytes) {
metrics.addMulticastMessageSent(bytes);
}

public void onDisconnect() {
metrics.onDisconnect();
}

public void onUnregister() {
metrics.unregister();
}

@Override
public String toString() {
return String.format("client=%s, name=%s, address=%s", clientAddress, logicalName, physicalAddress);
Expand Down
19 changes: 19 additions & 0 deletions src/org/jgroups/gossiprouter/metrics/GossipRouterGroupMetrics.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package org.jgroups.gossiprouter.metrics;

import org.jgroups.Address;
import org.jgroups.PhysicalAddress;

import java.util.function.IntSupplier;

public interface GossipRouterGroupMetrics {

GossipRouterMemberMetrics registerMember(Address logicalAddress, PhysicalAddress physicalAddress, String logicalName);

void setGroupSize(IntSupplier groupSizeSupplier);

void incrementRegisterEvents();

void incrementUnregisterEvents();

void incrementSuspectEvents();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.jgroups.gossiprouter.metrics;

public interface GossipRouterMemberMetrics {
void addUnicastMessageReceived(int bytes);

void addMulticastMessageReceived(int bytes);

void addUnicastMessageSent(int bytes);

void addMulticastMessageSent(int bytes);

void onDisconnect();

void unregister();
}
6 changes: 6 additions & 0 deletions src/org/jgroups/gossiprouter/metrics/GossipRouterMetrics.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package org.jgroups.gossiprouter.metrics;

public interface GossipRouterMetrics {

GossipRouterGroupMetrics createGroupMetrics(String name);
}
77 changes: 77 additions & 0 deletions src/org/jgroups/gossiprouter/metrics/NoOpGossipRouterMetrics.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package org.jgroups.gossiprouter.metrics;

import org.jgroups.Address;
import org.jgroups.PhysicalAddress;

import java.util.function.IntSupplier;

public final class NoOpGossipRouterMetrics implements GossipRouterMetrics {

public static final NoOpGossipRouterMetrics INSTANCE = new NoOpGossipRouterMetrics();
private static final GossipRouterMemberMetrics MEMBER_METRICS = new GossipRouterMemberMetrics() {
@Override
public void addUnicastMessageReceived(int bytes) {

}

@Override
public void addMulticastMessageReceived(int bytes) {

}

@Override
public void addUnicastMessageSent(int bytes) {

}

@Override
public void addMulticastMessageSent(int bytes) {

}

@Override
public void onDisconnect() {

}

@Override
public void unregister() {

}
};
private static final GossipRouterGroupMetrics GROUP_METRICS = new GossipRouterGroupMetrics() {
@Override
public GossipRouterMemberMetrics registerMember(Address logicalAddress, PhysicalAddress physicalAddress, String logicalName) {
return MEMBER_METRICS;
}

@Override
public void setGroupSize(IntSupplier groupSizeSupplier) {

}

@Override
public void incrementRegisterEvents() {

}

@Override
public void incrementUnregisterEvents() {

}

@Override
public void incrementSuspectEvents() {

}
};

private NoOpGossipRouterMetrics() {
}

@Override
public GossipRouterGroupMetrics createGroupMetrics(String name) {
return GROUP_METRICS;
}

}
Loading

0 comments on commit a662294

Please sign in to comment.