Skip to content

Commit

Permalink
[improve][broker][PIP-379] Don't replace a consumer when there's a co…
Browse files Browse the repository at this point in the history
…llision (apache#23441)
  • Loading branch information
lhotari authored Oct 11, 2024
1 parent 807d189 commit 390d7d9
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Range;

/**
* This is a consumer selector using consistent hashing to evenly split
* the number of keys assigned to each consumer.
*/
@Slf4j
public class ConsistentHashingStickyKeyConsumerSelector implements StickyKeyConsumerSelector {
// use NUL character as field separator for hash key calculation
private static final String KEY_SEPARATOR = "\0";
Expand Down Expand Up @@ -76,18 +78,36 @@ public CompletableFuture<Optional<ImpactedConsumersResult>> addConsumer(Consumer
ConsumerIdentityWrapper consumerIdentityWrapper = new ConsumerIdentityWrapper(consumer);
// Insert multiple points on the hash ring for every consumer
// The points are deterministically added based on the hash of the consumer name
int hashPointsAdded = 0;
int hashPointCollisions = 0;
for (int i = 0; i < numberOfPoints; i++) {
int consumerNameIndex =
consumerNameIndexTracker.increaseConsumerRefCountAndReturnIndex(consumerIdentityWrapper);
int hash = calculateHashForConsumerAndIndex(consumer, consumerNameIndex, i);
// When there's a collision, the new consumer will replace the old one.
// This is a rare case, and it is acceptable to replace the old consumer since there
// are multiple points for each consumer. This won't affect the overall distribution significantly.
ConsumerIdentityWrapper removed = hashRing.put(hash, consumerIdentityWrapper);
if (removed != null) {
consumerNameIndexTracker.decreaseConsumerRefCount(removed);
// When there's a collision, the entry won't be added to the hash ring.
// This isn't a problem with the consumerNameIndexTracker solution since the collisions won't align
// for all hash ring points when using the same consumer name. This won't affect the overall
// distribution significantly when the number of hash ring points is sufficiently large (>100).
ConsumerIdentityWrapper existing = hashRing.putIfAbsent(hash, consumerIdentityWrapper);
if (existing != null) {
hashPointCollisions++;
// reduce the ref count which was increased before adding since the consumer was not added
consumerNameIndexTracker.decreaseConsumerRefCount(consumerIdentityWrapper);
} else {
hashPointsAdded++;
}
}
if (hashPointsAdded == 0) {
log.error("Failed to add consumer '{}' to the hash ring. There were {} collisions. Consider increasing "
+ "the number of points ({}) per consumer by setting "
+ "subscriptionKeySharedConsistentHashingReplicaPoints={}",
consumer, hashPointCollisions, numberOfPoints,
Math.max((int) (numberOfPoints * 1.5d), numberOfPoints + 1));
}
if (log.isDebugEnabled()) {
log.debug("Added consumer '{}' with {} points, {} collisions", consumer, hashPointsAdded,
hashPointCollisions);
}
if (!addOrRemoveReturnsImpactedConsumersResult) {
return CompletableFuture.completedFuture(Optional.empty());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ public static ConsumerHashAssignmentsSnapshot empty() {
return new ConsumerHashAssignmentsSnapshot(Collections.emptyList());
}

public ImpactedConsumersResult resolveImpactedConsumers(ConsumerHashAssignmentsSnapshot other) {
return resolveConsumerRemovedHashRanges(this.hashRangeAssignments, other.hashRangeAssignments);
public ImpactedConsumersResult resolveImpactedConsumers(ConsumerHashAssignmentsSnapshot assignmentsAfter) {
return resolveConsumerRemovedHashRanges(this.hashRangeAssignments, assignmentsAfter.hashRangeAssignments);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,11 +475,11 @@ public void testShouldNotChangeSelectedConsumerWhenConsumerIsAdded() {
}

@Test
public void testShouldContainMinimalMappingChangesWhenConsumerLeavesAndRejoins() {
public void testShouldNotContainMappingChangesWhenConsumersLeaveAndRejoinInSameOrder() {
final ConsistentHashingStickyKeyConsumerSelector selector =
new ConsistentHashingStickyKeyConsumerSelector(100, true);
new ConsistentHashingStickyKeyConsumerSelector(200, true);
final String consumerName = "consumer";
final int numOfInitialConsumers = 10;
final int numOfInitialConsumers = 200;
List<Consumer> consumers = new ArrayList<>();
for (int i = 0; i < numOfInitialConsumers; i++) {
final Consumer consumer = createMockConsumer(consumerName, "index " + i, i);
Expand All @@ -498,14 +498,8 @@ public void testShouldContainMinimalMappingChangesWhenConsumerLeavesAndRejoins()
selector.addConsumer(consumers.get(numOfInitialConsumers / 2));

ConsumerHashAssignmentsSnapshot assignmentsAfter = selector.getConsumerHashAssignmentsSnapshot();
int removedRangesSize = assignmentsBefore.diffRanges(assignmentsAfter).keySet().stream()
.mapToInt(Range::size)
.sum();
double allowedremovedRangesPercentage = 1; // 1%
int hashRangeSize = selector.getKeyHashRange().size();
int allowedremovedRanges = (int) (hashRangeSize * (allowedremovedRangesPercentage / 100.0d));
assertThat(removedRangesSize).describedAs("Allow up to %d%% of total hash range size to be impacted",
allowedremovedRangesPercentage).isLessThan(allowedremovedRanges);

assertThat(assignmentsBefore.resolveImpactedConsumers(assignmentsAfter).getRemovedHashRanges()).isEmpty();
}

@Test
Expand Down

0 comments on commit 390d7d9

Please sign in to comment.