Skip to content

Commit

Permalink
Use append-only circular buffer for LastGatherer (#43)
Browse files Browse the repository at this point in the history
Use a custom-crafted append-only circular buffer implementation as backing the implementation of `LastGatherer` for the sake of better performance

<img width="1432" alt="Screenshot 2024-10-15 at 13 16 46"
src="https://github.com/user-attachments/assets/29454d5e-b455-4b21-a92c-da6e2b0e8a92">
  • Loading branch information
pivovarit authored Oct 15, 2024
1 parent 1095fc4 commit d4bf153
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 18 deletions.
6 changes: 0 additions & 6 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -279,12 +279,6 @@
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
Expand Down
47 changes: 35 additions & 12 deletions src/main/java/com/pivovarit/gatherers/LastGatherer.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package com.pivovarit.gatherers;

import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Gatherer;

record LastGatherer<T>(long n) implements Gatherer<T, LinkedList<T>, T> {
record LastGatherer<T>(
long n) implements Gatherer<T, LastGatherer.AppendOnlyCircularBuffer<T>, T> {

LastGatherer {
if (n <= 0) {
Expand All @@ -14,25 +16,46 @@ record LastGatherer<T>(long n) implements Gatherer<T, LinkedList<T>, T> {
}

@Override
public Supplier<LinkedList<T>> initializer() {
return LinkedList::new;
public Supplier<LastGatherer.AppendOnlyCircularBuffer<T>> initializer() {
return () -> new LastGatherer.AppendOnlyCircularBuffer<>((int) n);
}

@Override
public Integrator<LinkedList<T>, T, T> integrator() {
public Integrator<LastGatherer.AppendOnlyCircularBuffer<T>, T, T> integrator() {
return Integrator.ofGreedy((state, element, _) -> {
if (state.size() == n) {
state.removeFirst();
state.addLast(element);
} else {
state.addLast(element);
}
state.add(element);
return true;
});
}

@Override
public BiConsumer<LinkedList<T>, Downstream<? super T>> finisher() {
public BiConsumer<LastGatherer.AppendOnlyCircularBuffer<T>, Downstream<? super T>> finisher() {
return (state, downstream) -> state.forEach(downstream::push);
}

static class AppendOnlyCircularBuffer<T> {
private final T[] buffer;
private final int maxSize;
private final AtomicInteger endIdx = new AtomicInteger(0);
private final AtomicInteger size = new AtomicInteger(0);

public AppendOnlyCircularBuffer(int size) {
this.maxSize = size;
this.buffer = (T[]) new Object[size];
}

public void add(T element) {
buffer[endIdx.getAndIncrement() % maxSize] = element;
if (size.get() < maxSize) {
size.incrementAndGet();
}
}

public void forEach(Consumer<T> consumer) {
int startIdx = (endIdx.get() - size.get() + maxSize) % maxSize;
for (int i = 0; i < size.get(); i++) {
consumer.accept(buffer[(startIdx + i) % maxSize]);
}
}
}
}

0 comments on commit d4bf153

Please sign in to comment.