Skip to content

Commit

Permalink
Switch to epoch metrics + cache only most recent blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
zilm13 committed Sep 14, 2023
1 parent 70c5e6c commit 1cf2206
Show file tree
Hide file tree
Showing 9 changed files with 128 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,10 @@ static BlockProvider meteredFalse(
block ->
blocksLabelledCounter
.labels(
"get",
"false",
blockToSlotMetrics.apply(block.getRoot(), block),
"meteredFalse")
"get", "false", blockToSlotMetrics.apply(block.getRoot(), block))
.inc());
IntStream.range(0, blocksMap.size() - blockRoots.size())
.forEach(
__ -> blocksLabelledCounter.labels("get", "false", "-1", "meteredFalse").inc());
.forEach(__ -> blocksLabelledCounter.labels("get", "false", "-1").inc());
});
};
}
Expand Down
1 change: 1 addition & 0 deletions gradle/versions.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ dependencyManagement {

dependency 'org.apache.commons:commons-text:1.10.0'
dependency 'org.apache.commons:commons-lang3:3.13.0'
dependency 'org.apache.commons:commons-collections4:4.4'
dependency 'commons-io:commons-io:2.13.0'
dependency 'org.commonjava.mimeparse:mimeparse:0.1.3.3'

Expand Down
2 changes: 2 additions & 0 deletions infrastructure/collections/build.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
dependencies {
implementation 'com.google.guava:guava'
implementation 'org.apache.commons:commons-collections4'


testFixturesImplementation 'it.unimi.dsi:fastutil'
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@
package tech.pegasys.teku.infrastructure.collections;

import java.util.Collection;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.apache.commons.collections4.bidimap.DualTreeBidiMap;

abstract class AbstractLimitedMap<K, V> implements LimitedMap<K, V> {

Expand All @@ -32,6 +35,28 @@ protected boolean removeEldestEntry(final Map.Entry<K, V> eldest) {
};
}

protected static <K, V> Map<K, V> createSortedLimitedMap(
final int maxSize,
final Comparator<? super K> keyComparator,
final Comparator<? super V> valueComparator) {
final TreeMap<V, K> reverseMap = new TreeMap<>(valueComparator);
return new DualTreeBidiMap<>(new TreeMap<>(keyComparator), reverseMap, null) {
@Override
public V put(final K key, final V value) {
V old = super.put(key, value);
if (size() > maxSize) {
remove(reverseMap.firstEntry().getValue());
}
return old;
}

@Override
public void putAll(final Map<? extends K, ? extends V> map) {
map.forEach(this::put);
}
};
}

protected final Map<K, V> delegate;
protected final int maxSize;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package tech.pegasys.teku.infrastructure.collections;

import com.google.common.cache.CacheBuilder;
import java.util.Comparator;
import java.util.Map;

public interface LimitedMap<K, V> extends Map<K, V> {
Expand Down Expand Up @@ -48,6 +49,13 @@ static <K, V> LimitedMap<K, V> createSynchronized(final int maxSize) {
return new SynchronizedLimitedMap<>(maxSize);
}

static <K, V> LimitedMap<K, V> createSortedSynchronized(
final int maxSize,
final Comparator<? super K> keyComparator,
final Comparator<? super V> valueComparator) {
return new SynchronizedSortedLimitedMap<>(maxSize, keyComparator, valueComparator);
}

/**
* Creates a limited map. The returned map is safe for all forms of concurrent access including
* iteration and evicts the least recently used items.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright Consensys Software Inc., 2022
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.infrastructure.collections;

import java.util.Collections;
import java.util.Comparator;

/** Helper that creates a thread-safe sorted map with a maximum capacity. */
final class SynchronizedSortedLimitedMap<K, V> extends AbstractLimitedMap<K, V> {
final Comparator<? super K> keyComparator;
final Comparator<? super V> valueComparator;

public SynchronizedSortedLimitedMap(
final int maxSize,
final Comparator<? super K> keyComparator,
final Comparator<? super V> valueComparator) {
super(
Collections.synchronizedMap(
createSortedLimitedMap(maxSize, keyComparator, valueComparator)),
maxSize);
this.keyComparator = keyComparator;
this.valueComparator = valueComparator;
}

@Override
public LimitedMap<K, V> copy() {
SynchronizedSortedLimitedMap<K, V> map =
new SynchronizedSortedLimitedMap<>(getMaxSize(), keyComparator, valueComparator);
synchronized (delegate) {
map.putAll(delegate);
}
return map;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,9 @@ public boolean isEmpty() {
public boolean containsKey(final Object key) {
final boolean containsKey = delegate.containsKey(key);
if (containsKey) {
labelledCounter.labels("contains", "true", "-1", "containsKey").inc();
labelledCounter.labels("contains", "true", "-1").inc();
} else {
labelledCounter.labels("contains", "false", "-1", "containsKey").inc();
labelledCounter.labels("contains", "false", "-1").inc();
}
return containsKey;
}
Expand All @@ -133,7 +133,7 @@ public boolean containsValue(final Object value) {
public V get(final Object key) {
final V maybeValue = delegate.get(key);
if (maybeValue != null) {
labelledCounter.labels("get", "true", valueFunction.apply((K) key, maybeValue), "get").inc();
labelledCounter.labels("get", "true", valueFunction.apply((K) key, maybeValue)).inc();
}
return maybeValue;
}
Expand All @@ -142,7 +142,7 @@ public V get(final Object key) {
public V put(final K key, final V value) {
final V oldValue = delegate.put(key, value);
labelledCounter
.labels("put", String.valueOf(oldValue != null), valueFunction.apply(key, value), "put")
.labels("put", String.valueOf(oldValue != null), valueFunction.apply(key, value))
.inc();
return oldValue;
}
Expand All @@ -152,9 +152,7 @@ public V put(final K key, final V value) {
public V remove(final Object key) {
final V maybeValue = delegate.remove(key);
if (maybeValue != null) {
labelledCounter
.labels("remove", "true", valueFunction.apply((K) key, maybeValue), "remove")
.inc();
labelledCounter.labels("remove", "true", valueFunction.apply((K) key, maybeValue)).inc();
}
return maybeValue;
}
Expand Down
44 changes: 14 additions & 30 deletions storage/src/main/java/tech/pegasys/teku/storage/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ class Store implements UpdatableStore {
VoteTracker[] votes;
UInt64 highestVotedValidatorIndex;
final LabelledMetric<Counter> blocksLabelledCounter;
final BiFunction<Bytes32, SignedBeaconBlock, String> blockToSlotMetrics;
final BiFunction<Bytes32, SignedBeaconBlock, String> blockToEpochMetrics;

private Store(
final MetricsSystem metricsSystem,
Expand Down Expand Up @@ -161,21 +161,20 @@ private Store(
this.blocksLabelledCounter =
metricsSystem.createLabelledCounter(
TekuMetricCategory.STORAGE,
"store_blocks_cache",
"store_block_cache",
"Number of Store blocks hits",
"type",
"success",
"slots",
"path");
this.blockToSlotMetrics =
"epoch");
this.blockToEpochMetrics =
(__, beaconBlock) -> {
int diff = spec.getCurrentSlot(this).minusMinZero(beaconBlock.getSlot()).intValue();
if (diff > 99) {
diff = 99;
}
return String.format("%02d", diff);
return String.valueOf(diff / 32);
};
this.blocks = new MeteredMap<>(blocks, blocksLabelledCounter, blockToSlotMetrics);
this.blocks = new MeteredMap<>(blocks, blocksLabelledCounter, blockToEpochMetrics);
this.highestVotedValidatorIndex =
votes.keySet().stream().max(Comparator.naturalOrder()).orElse(UInt64.ZERO);
this.votes =
Expand All @@ -198,7 +197,7 @@ private Store(
.map((b) -> Map.of(b.getRoot(), b))
.orElseGet(Collections::emptyMap)),
fromMap(this.blocks),
BlockProvider.meteredFalse(blockProvider, blocksLabelledCounter, blockToSlotMetrics));
BlockProvider.meteredFalse(blockProvider, blocksLabelledCounter, blockToEpochMetrics));
this.blobSidecarsProvider = blobSidecarsProvider;
this.earliestBlobSidecarSlotProvider = earliestBlobSidecarSlotProvider;
}
Expand All @@ -224,7 +223,11 @@ public static UpdatableStore create(

// Create limited collections for non-final data
final Map<Bytes32, SignedBeaconBlock> blocks =
LimitedMap.createSynchronized(config.getBlockCacheSize());
LimitedMap.createSortedSynchronized(
config.getBlockCacheSize(),
Comparator.naturalOrder(),
Comparator.comparing(SignedBeaconBlock::getSlot)
.thenComparing(SignedBeaconBlock::getRoot));
final CachingTaskQueue<SlotAndBlockRoot, BeaconState> checkpointStateTaskQueue =
CachingTaskQueue.create(
asyncRunner,
Expand Down Expand Up @@ -521,17 +524,6 @@ public SafeFuture<Optional<SignedBeaconBlock>> retrieveSignedBlock(final Bytes32
.getBlock(blockRoot)
.thenApply(
block -> {
if (block.isPresent()) {
blocksLabelledCounter
.labels(
"get",
"false",
blockToSlotMetrics.apply(blockRoot, block.get()),
"retrieveSignedBlock")
.inc();
} else {
blocksLabelledCounter.labels("get", "false", "-1", "retrieveSignedBlock").inc();
}
block.ifPresent(this::putBlock);
return block;
});
Expand Down Expand Up @@ -849,16 +841,8 @@ private boolean isSlotAtNthEpochBoundary(
}

private void putBlock(final SignedBeaconBlock block) {
final Lock writeLock = lock.writeLock();
writeLock.lock();
try {
if (containsBlock(block.getRoot())) {
blocks.put(block.getRoot(), block);
blockCountGauge.ifPresent(gauge -> gauge.set(blocks.size()));
}
} finally {
writeLock.unlock();
}
blockCountGauge.ifPresent(gauge -> gauge.set(blocks.size()));
// do nothing
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,20 @@
import static tech.pegasys.teku.infrastructure.time.TimeUtilities.millisToSeconds;

import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.tuweni.bytes.Bytes32;
import org.junit.jupiter.api.Test;
import tech.pegasys.teku.dataproviders.lookup.BlobSidecarsProvider;
import tech.pegasys.teku.dataproviders.lookup.EarliestBlobSidecarSlotProvider;
import tech.pegasys.teku.dataproviders.lookup.StateAndBlockSummaryProvider;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.collections.LimitedMap;
import tech.pegasys.teku.infrastructure.metrics.StubMetricsSystem;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.datastructures.blocks.BeaconBlock;
Expand Down Expand Up @@ -297,6 +302,27 @@ public void retrieveFinalizedCheckpointAndState() {
assertThatThrownBy(result::get).hasCauseInstanceOf(InvalidCheckpointException.class);
}

@Test
public void testSortedMap() {
final Map<Bytes32, SignedBeaconBlock> blocks =
LimitedMap.createSortedSynchronized(
10,
Comparator.naturalOrder(),
Comparator.comparing(SignedBeaconBlock::getSlot)
.thenComparing(SignedBeaconBlock::getRoot));
chainBuilder.generateGenesis();
final List<SignedBlockAndState> signedBlockAndStates = chainBuilder.generateBlocksUpToSlot(100);
Collections.shuffle(signedBlockAndStates);
signedBlockAndStates.forEach(
signedBlockAndState ->
blocks.put(signedBlockAndState.getBlock().getRoot(), signedBlockAndState.getBlock()));
final Set<UInt64> actual =
blocks.values().stream().map(SignedBeaconBlock::getSlot).collect(Collectors.toSet());
final Set<UInt64> expected =
IntStream.rangeClosed(91, 100).mapToObj(UInt64::valueOf).collect(Collectors.toSet());
assertThat(actual).isEqualTo(expected);
}

private void testApplyChangesWhenTransactionCommits(final boolean withInterleavedTransaction) {
final UpdatableStore store = createGenesisStore();
final UInt64 epoch3 = UInt64.valueOf(4);
Expand Down

0 comments on commit 1cf2206

Please sign in to comment.