diff --git a/pom.xml b/pom.xml index decc602..a16fee6 100644 --- a/pom.xml +++ b/pom.xml @@ -279,12 +279,6 @@ ${junit.version} test - - org.junit.vintage - junit-vintage-engine - ${junit.version} - test - org.assertj assertj-core diff --git a/src/main/java/com/pivovarit/gatherers/LastGatherer.java b/src/main/java/com/pivovarit/gatherers/LastGatherer.java index d0c8475..0ab3b0d 100644 --- a/src/main/java/com/pivovarit/gatherers/LastGatherer.java +++ b/src/main/java/com/pivovarit/gatherers/LastGatherer.java @@ -1,11 +1,13 @@ package com.pivovarit.gatherers; -import java.util.LinkedList; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; +import java.util.function.Consumer; import java.util.function.Supplier; import java.util.stream.Gatherer; -record LastGatherer(long n) implements Gatherer, T> { +record LastGatherer( + long n) implements Gatherer, T> { LastGatherer { if (n <= 0) { @@ -14,25 +16,46 @@ record LastGatherer(long n) implements Gatherer, T> { } @Override - public Supplier> initializer() { - return LinkedList::new; + public Supplier> initializer() { + return () -> new LastGatherer.AppendOnlyCircularBuffer<>((int) n); } @Override - public Integrator, T, T> integrator() { + public Integrator, T, T> integrator() { return Integrator.ofGreedy((state, element, _) -> { - if (state.size() == n) { - state.removeFirst(); - state.addLast(element); - } else { - state.addLast(element); - } + state.add(element); return true; }); } @Override - public BiConsumer, Downstream> finisher() { + public BiConsumer, Downstream> finisher() { return (state, downstream) -> state.forEach(downstream::push); } + + static class AppendOnlyCircularBuffer { + private final T[] buffer; + private final int maxSize; + private final AtomicInteger endIdx = new AtomicInteger(0); + private final AtomicInteger size = new AtomicInteger(0); + + public AppendOnlyCircularBuffer(int size) { + this.maxSize = size; + this.buffer = (T[]) new Object[size]; + } + + public void add(T element) { + buffer[endIdx.getAndIncrement() % maxSize] = element; + if (size.get() < maxSize) { + size.incrementAndGet(); + } + } + + public void forEach(Consumer consumer) { + int startIdx = (endIdx.get() - size.get() + maxSize) % maxSize; + for (int i = 0; i < size.get(); i++) { + consumer.accept(buffer[(startIdx + i) % maxSize]); + } + } + } }