diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java b/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java index 7989ecb9086c..332ba682ea86 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java @@ -274,8 +274,7 @@ else if (shouldFlush(sstableRowId)) if (term.remaining() == 0 && TypeUtil.skipsEmptyValue(indexContext.getValidator())) return false; - long allocated = currentBuilder.analyzeAndAdd(term, type, key, sstableRowId); - limiter.increment(allocated); + currentBuilder.analyzeAndAdd(term, type, key, sstableRowId); return true; } @@ -301,13 +300,6 @@ private boolean shouldFlush(long sstableRowId) private void flushSegment() throws IOException { currentBuilder.awaitAsyncAdditions(); - if (currentBuilder.supportsAsyncAdd() - && currentBuilder.totalBytesAllocatedConcurrent.sum() > 1.1 * currentBuilder.totalBytesAllocated()) - { - logger.warn("Concurrent memory usage is higher than estimated: {} vs {}", - currentBuilder.totalBytesAllocatedConcurrent.sum(), currentBuilder.totalBytesAllocated()); - } - // throw exceptions that occurred during async addInternal() var ae = currentBuilder.getAsyncThrowable(); if (ae != null) diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentBuilder.java b/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentBuilder.java index 320f59631c84..6bccfab2991b 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentBuilder.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentBuilder.java @@ -60,7 +60,6 @@ import org.apache.cassandra.index.sai.utils.NamedMemoryLimiter; import org.apache.cassandra.index.sai.utils.PrimaryKey; import org.apache.cassandra.index.sai.utils.TypeUtil; -import org.apache.cassandra.metrics.QuickSlidingWindowReservoir; import org.apache.cassandra.utils.bytecomparable.ByteComparable; import org.apache.cassandra.utils.bytecomparable.ByteSourceInverse; import org.apache.lucene.util.BytesRef; @@ -102,10 +101,8 @@ public abstract class SegmentBuilder final AbstractAnalyzer analyzer; // track memory usage for this segment so we can flush when it gets too big - private final NamedMemoryLimiter limiter; + protected final NamedMemoryLimiter limiter; long totalBytesAllocated; - // when we're adding terms asynchronously, totalBytesAllocated will be an approximation and this tracks the exact size - final LongAdder totalBytesAllocatedConcurrent = new LongAdder(); private final long lastValidSegmentRowID; @@ -127,7 +124,6 @@ public abstract class SegmentBuilder protected ByteBuffer maxTerm; protected final AtomicInteger updatesInFlight = new AtomicInteger(0); - protected final QuickSlidingWindowReservoir termSizeReservoir = new QuickSlidingWindowReservoir(100); protected AtomicReference asyncThrowable = new AtomicReference<>(); @@ -151,7 +147,7 @@ public static class KDTreeSegmentBuilder extends SegmentBuilder this.buffer = new byte[typeSize]; this.indexWriterConfig = indexWriterConfig; totalBytesAllocated = kdTreeRamBuffer.ramBytesUsed(); - totalBytesAllocatedConcurrent.add(totalBytesAllocated);} + } public boolean isEmpty() { @@ -200,7 +196,6 @@ public static class RAMStringSegmentBuilder extends SegmentBuilder this.byteComparableVersion = components.byteComparableVersionFor(IndexComponentType.TERMS_DATA); ramIndexer = new RAMStringIndexer(writeFrequencies()); totalBytesAllocated = ramIndexer.estimatedBytesUsed(); - totalBytesAllocatedConcurrent.add(totalBytesAllocated); } private boolean writeFrequencies() @@ -247,6 +242,8 @@ public boolean requiresFlush() public static class VectorOffHeapSegmentBuilder extends SegmentBuilder { private final CompactionGraph graphIndex; + protected final AtomicLong maxBytesAddedObserved = new AtomicLong(0); + protected final LongAdder reconciliationBytes = new LongAdder(); public VectorOffHeapSegmentBuilder(IndexComponents.ForWrite components, long rowIdOffset, @@ -266,12 +263,12 @@ public VectorOffHeapSegmentBuilder(IndexComponents.ForWrite components, throw new UncheckedIOException(e); } totalBytesAllocated = graphIndex.ramBytesUsed(); - totalBytesAllocatedConcurrent.add(totalBytesAllocated); } @Override public boolean isEmpty() { + // Don't need to check updatesInFlight because we add the vector to the graphIndex before dispatching the task. return graphIndex.isEmpty(); } @@ -282,7 +279,7 @@ protected long addInternal(List terms, int segmentRowId) } @Override - protected long addInternalAsync(List terms, int segmentRowId) + protected void addInternalAsync(List terms, int segmentRowId) { assert terms.size() == 1; @@ -293,21 +290,34 @@ protected long addInternalAsync(List terms, int segmentRowId) try { result = graphIndex.maybeAddVector(terms.get(0), segmentRowId); + observeAllocatedBytes(result.bytesUsed); } catch (IOException e) { throw new UncheckedIOException(e); } if (result.vector == null) - return result.bytesUsed; + return; updatesInFlight.incrementAndGet(); + + // Increment by double max bytes before dispatching the task to avoid a subsequent insertion from + // exceeding the limit. Also add back any bytes that were reconciled previous calls that have since + // completed. It is expected that the reconciliation process typically results in a net reduction + // because we overestimate the number of bytes required per insertion, and as such, we allow for a lazy + // reconciliation process. + long estimatedBytes = maxBytesAddedObserved.get() * 2; + long reconciledBytes = reconciliationBytes.sumThenReset(); + observeAllocatedBytes(estimatedBytes + reconciledBytes); + compactionExecutor.submit(() -> { try { - long bytesAdded = result.bytesUsed + graphIndex.addGraphNode(result); - totalBytesAllocatedConcurrent.add(bytesAdded); - termSizeReservoir.update(bytesAdded); + long bytesAdded = graphIndex.addGraphNode(result); + maxBytesAddedObserved.accumulateAndGet(bytesAdded, Math::max); + // Store the difference between the estimated and actual bytes added for correction on the + // next call to addInternalAsync. + reconciliationBytes.add(bytesAdded - estimatedBytes); } catch (Throwable th) { @@ -318,15 +328,10 @@ protected long addInternalAsync(List terms, int segmentRowId) updatesInFlight.decrementAndGet(); } }); - // bytes allocated will be approximated immediately as the average of recently added terms, - // rather than waiting until the async update completes to get the exact value. The latter could - // result in a dangerously large discrepancy between the amount of memory actually consumed - // and the amount the limiter knows about if the queue depth grows. - busyWaitWhile(() -> termSizeReservoir.size() == 0 && asyncThrowable.get() == null); + if (asyncThrowable.get() != null) { throw new RuntimeException("Error adding term asynchronously", asyncThrowable.get()); } - return (long) termSizeReservoir.getMean(); } @Override @@ -344,6 +349,12 @@ public boolean supportsAsyncAdd() return true; } + @Override + public void reconcileAsyncByteAllocations() + { + observeAllocatedBytes(reconciliationBytes.sumThenReset()); + } + @Override public boolean requiresFlush() { @@ -368,19 +379,21 @@ long release(IndexContext indexContext) public static class VectorOnHeapSegmentBuilder extends SegmentBuilder { private final CassandraOnHeapGraph graphIndex; + protected final AtomicLong maxBytesAddedObserved = new AtomicLong(0); + protected final LongAdder reconciliationBytes = new LongAdder(); public VectorOnHeapSegmentBuilder(IndexComponents.ForWrite components, long rowIdOffset, long keyCount, NamedMemoryLimiter limiter) { super(components, rowIdOffset, limiter); graphIndex = new CassandraOnHeapGraph<>(components.context(), false, null); totalBytesAllocated = graphIndex.ramBytesUsed(); - totalBytesAllocatedConcurrent.add(totalBytesAllocated); } @Override public boolean isEmpty() { - return graphIndex.isEmpty(); + // Must check updatesInFlight first to avoid a race with addInternalAsync and graphIndex.isEmpty(). + return updatesInFlight.get() == 0 && graphIndex.isEmpty(); } @Override @@ -391,15 +404,21 @@ protected long addInternal(List terms, int segmentRowId) } @Override - protected long addInternalAsync(List terms, int segmentRowId) + protected void addInternalAsync(List terms, int segmentRowId) { updatesInFlight.incrementAndGet(); + // See VectorOffHeapSegmentBuilder for comments on this logic + long estimatedBytes = maxBytesAddedObserved.get() * 2; + long reconciledBytes = reconciliationBytes.sumThenReset(); + observeAllocatedBytes(estimatedBytes + reconciledBytes); compactionExecutor.submit(() -> { try { long bytesAdded = addInternal(terms, segmentRowId); - totalBytesAllocatedConcurrent.add(bytesAdded); - termSizeReservoir.update(bytesAdded); + maxBytesAddedObserved.accumulateAndGet(bytesAdded, Math::max); + // Store the difference between the estimated and actual bytes added for correction on the + // next call to addInternalAsync. + reconciliationBytes.add(bytesAdded - estimatedBytes); } catch (Throwable th) { @@ -410,15 +429,9 @@ protected long addInternalAsync(List terms, int segmentRowId) updatesInFlight.decrementAndGet(); } }); - // bytes allocated will be approximated immediately as the average of recently added terms, - // rather than waiting until the async update completes to get the exact value. The latter could - // result in a dangerously large discrepancy between the amount of memory actually consumed - // and the amount the limiter knows about if the queue depth grows. - busyWaitWhile(() -> termSizeReservoir.size() == 0 && asyncThrowable.get() == null); if (asyncThrowable.get() != null) { throw new RuntimeException("Error adding term asynchronously", asyncThrowable.get()); } - return (long) termSizeReservoir.getMean(); } @Override @@ -437,6 +450,12 @@ public boolean supportsAsyncAdd() { return true; } + + @Override + public void reconcileAsyncByteAllocations() + { + observeAllocatedBytes(reconciliationBytes.sumThenReset()); + } } private SegmentBuilder(IndexComponents.ForWrite components, long rowIdOffset, NamedMemoryLimiter limiter) @@ -452,6 +471,15 @@ private SegmentBuilder(IndexComponents.ForWrite components, long rowIdOffset, Na minimumFlushBytes = limiter.limitBytes() / ACTIVE_BUILDER_COUNT.getAndIncrement(); } + protected void observeAllocatedBytes(long bytes) + { + if (bytes != 0) + { + totalBytesAllocated += bytes; + limiter.increment(bytes); + } + } + public SegmentMetadata flush() throws IOException { assert !flushed; @@ -474,27 +502,25 @@ public SegmentMetadata flush() throws IOException return metadataBuilder.build(); } - public long analyzeAndAdd(ByteBuffer rawTerm, AbstractType type, PrimaryKey key, long sstableRowId) + public void analyzeAndAdd(ByteBuffer rawTerm, AbstractType type, PrimaryKey key, long sstableRowId) { - long totalSize = 0; if (TypeUtil.isLiteral(type)) { var terms = ByteLimitedMaterializer.materializeTokens(analyzer, rawTerm, components.context(), key); - totalSize += add(terms, key, sstableRowId); + add(terms, key, sstableRowId); totalTermCount += terms.size(); } else { - totalSize += add(List.of(rawTerm), key, sstableRowId); + add(List.of(rawTerm), key, sstableRowId); totalTermCount++; } - return totalSize; } - private long add(List terms, PrimaryKey key, long sstableRowId) + private void add(List terms, PrimaryKey key, long sstableRowId) { if (terms.isEmpty()) - return 0; + return; Preconditions.checkState(!flushed, "Cannot add to flushed segment"); Preconditions.checkArgument(sstableRowId >= maxSSTableRowId, @@ -527,23 +553,20 @@ private long add(List terms, PrimaryKey key, long sstableRowId) maxSegmentRowId = Math.max(maxSegmentRowId, segmentRowId); - long bytesAllocated; if (supportsAsyncAdd()) { // only vector indexing is done async and there can only be one term assert terms.size() == 1; - bytesAllocated = addInternalAsync(terms, segmentRowId); + addInternalAsync(terms, segmentRowId); } else { - bytesAllocated = addInternal(terms, segmentRowId); + long bytesAllocated = addInternal(terms, segmentRowId); + observeAllocatedBytes(bytesAllocated); } - - totalBytesAllocated += bytesAllocated; - return bytesAllocated; } - protected long addInternalAsync(List terms, int segmentRowId) + protected void addInternalAsync(List terms, int segmentRowId) { throw new UnsupportedOperationException(); } @@ -552,6 +575,10 @@ public boolean supportsAsyncAdd() { return false; } + protected void reconcileAsyncByteAllocations() + { + } + public Throwable getAsyncThrowable() { return asyncThrowable.get(); @@ -561,7 +588,8 @@ public void awaitAsyncAdditions() { // addTerm is only called by the compaction thread, serially, so we don't need to worry about new // terms being added while we're waiting -- updatesInFlight can only decrease - busyWaitWhile(() -> updatesInFlight.get() > 0); + busyWaitWhile(() -> updatesInFlight.get() > 0, 60_000); + reconcileAsyncByteAllocations(); } long totalBytesAllocated() diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java index 1bb8677f5cb0..2caa3accf04c 100644 --- a/src/java/org/apache/cassandra/utils/FBUtilities.java +++ b/src/java/org/apache/cassandra/utils/FBUtilities.java @@ -1396,10 +1396,12 @@ public static String getStackTrace(ThreadInfo threadInfo) } } - public static void busyWaitWhile(Supplier condition) + public static void busyWaitWhile(Supplier condition, int timeoutMs) { while (condition.get()) { + if (timeoutMs-- <= 0) + throw new RuntimeException("Timeout while waiting for condition"); try { Thread.sleep(1); diff --git a/test/unit/org/apache/cassandra/index/sai/disk/v1/SegmentFlushTest.java b/test/unit/org/apache/cassandra/index/sai/disk/v1/SegmentFlushTest.java index 557f433df267..16cd43957ad7 100644 --- a/test/unit/org/apache/cassandra/index/sai/disk/v1/SegmentFlushTest.java +++ b/test/unit/org/apache/cassandra/index/sai/disk/v1/SegmentFlushTest.java @@ -155,6 +155,7 @@ private void testFlushBetweenRowIds(long sstableRowId1, long sstableRowId2, int MockSchema.newCFS("ks")); IndexComponents.ForWrite components = indexDescriptor.newPerIndexComponentsForWrite(indexContext); + long startingLimiterValue = V1OnDiskFormat.SEGMENT_BUILD_MEMORY_LIMITER.currentBytesUsed(); SSTableIndexWriter writer = new SSTableIndexWriter(components, V1OnDiskFormat.SEGMENT_BUILD_MEMORY_LIMITER, () -> false, () -> false, 2); List keys = Arrays.asList(dk("1"), dk("2")); @@ -172,6 +173,7 @@ private void testFlushBetweenRowIds(long sstableRowId1, long sstableRowId2, int writer.addRow(SAITester.TEST_FACTORY.create(key2, Clustering.EMPTY), row2, sstableRowId2); writer.complete(Stopwatch.createStarted()); + assertEquals(startingLimiterValue, V1OnDiskFormat.SEGMENT_BUILD_MEMORY_LIMITER.currentBytesUsed()); MetadataSource source = MetadataSource.loadMetadata(components);