Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -274,8 +274,7 @@ else if (shouldFlush(sstableRowId))
if (term.remaining() == 0 && TypeUtil.skipsEmptyValue(indexContext.getValidator()))
return false;

long allocated = currentBuilder.analyzeAndAdd(term, type, key, sstableRowId);
limiter.increment(allocated);
currentBuilder.analyzeAndAdd(term, type, key, sstableRowId);
return true;
}

Expand All @@ -301,13 +300,6 @@ private boolean shouldFlush(long sstableRowId)
private void flushSegment() throws IOException
{
currentBuilder.awaitAsyncAdditions();
if (currentBuilder.supportsAsyncAdd()
&& currentBuilder.totalBytesAllocatedConcurrent.sum() > 1.1 * currentBuilder.totalBytesAllocated())
{
logger.warn("Concurrent memory usage is higher than estimated: {} vs {}",
currentBuilder.totalBytesAllocatedConcurrent.sum(), currentBuilder.totalBytesAllocated());
}

// throw exceptions that occurred during async addInternal()
var ae = currentBuilder.getAsyncThrowable();
if (ae != null)
Expand Down
118 changes: 73 additions & 45 deletions src/java/org/apache/cassandra/index/sai/disk/v1/SegmentBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
import org.apache.cassandra.index.sai.utils.NamedMemoryLimiter;
import org.apache.cassandra.index.sai.utils.PrimaryKey;
import org.apache.cassandra.index.sai.utils.TypeUtil;
import org.apache.cassandra.metrics.QuickSlidingWindowReservoir;
import org.apache.cassandra.utils.bytecomparable.ByteComparable;
import org.apache.cassandra.utils.bytecomparable.ByteSourceInverse;
import org.apache.lucene.util.BytesRef;
Expand Down Expand Up @@ -102,10 +101,8 @@ public abstract class SegmentBuilder
final AbstractAnalyzer analyzer;

// track memory usage for this segment so we can flush when it gets too big
private final NamedMemoryLimiter limiter;
protected final NamedMemoryLimiter limiter;
long totalBytesAllocated;
// when we're adding terms asynchronously, totalBytesAllocated will be an approximation and this tracks the exact size
final LongAdder totalBytesAllocatedConcurrent = new LongAdder();

private final long lastValidSegmentRowID;

Expand All @@ -127,7 +124,6 @@ public abstract class SegmentBuilder
protected ByteBuffer maxTerm;

protected final AtomicInteger updatesInFlight = new AtomicInteger(0);
protected final QuickSlidingWindowReservoir termSizeReservoir = new QuickSlidingWindowReservoir(100);
protected AtomicReference<Throwable> asyncThrowable = new AtomicReference<>();


Expand All @@ -151,7 +147,7 @@ public static class KDTreeSegmentBuilder extends SegmentBuilder
this.buffer = new byte[typeSize];
this.indexWriterConfig = indexWriterConfig;
totalBytesAllocated = kdTreeRamBuffer.ramBytesUsed();
totalBytesAllocatedConcurrent.add(totalBytesAllocated);}
}

public boolean isEmpty()
{
Expand Down Expand Up @@ -200,7 +196,6 @@ public static class RAMStringSegmentBuilder extends SegmentBuilder
this.byteComparableVersion = components.byteComparableVersionFor(IndexComponentType.TERMS_DATA);
ramIndexer = new RAMStringIndexer(writeFrequencies());
totalBytesAllocated = ramIndexer.estimatedBytesUsed();
totalBytesAllocatedConcurrent.add(totalBytesAllocated);
}

private boolean writeFrequencies()
Expand Down Expand Up @@ -247,6 +242,8 @@ public boolean requiresFlush()
public static class VectorOffHeapSegmentBuilder extends SegmentBuilder
{
private final CompactionGraph graphIndex;
protected final AtomicLong maxBytesAddedObserved = new AtomicLong(0);
protected final LongAdder reconciliationBytes = new LongAdder();

public VectorOffHeapSegmentBuilder(IndexComponents.ForWrite components,
long rowIdOffset,
Expand All @@ -266,12 +263,12 @@ public VectorOffHeapSegmentBuilder(IndexComponents.ForWrite components,
throw new UncheckedIOException(e);
}
totalBytesAllocated = graphIndex.ramBytesUsed();
totalBytesAllocatedConcurrent.add(totalBytesAllocated);
}

@Override
public boolean isEmpty()
{
// Don't need to check updatesInFlight because we add the vector to the graphIndex before dispatching the task.
return graphIndex.isEmpty();
}

Expand All @@ -282,7 +279,7 @@ protected long addInternal(List<ByteBuffer> terms, int segmentRowId)
}

@Override
protected long addInternalAsync(List<ByteBuffer> terms, int segmentRowId)
protected void addInternalAsync(List<ByteBuffer> terms, int segmentRowId)
{
assert terms.size() == 1;

Expand All @@ -293,21 +290,34 @@ protected long addInternalAsync(List<ByteBuffer> terms, int segmentRowId)
try
{
result = graphIndex.maybeAddVector(terms.get(0), segmentRowId);
observeAllocatedBytes(result.bytesUsed);
}
catch (IOException e)
{
throw new UncheckedIOException(e);
}
if (result.vector == null)
return result.bytesUsed;
return;

updatesInFlight.incrementAndGet();

// Increment by double max bytes before dispatching the task to avoid a subsequent insertion from
// exceeding the limit. Also add back any bytes that were reconciled previous calls that have since
// completed. It is expected that the reconciliation process typically results in a net reduction
// because we overestimate the number of bytes required per insertion, and as such, we allow for a lazy
// reconciliation process.
long estimatedBytes = maxBytesAddedObserved.get() * 2;
long reconciledBytes = reconciliationBytes.sumThenReset();
observeAllocatedBytes(estimatedBytes + reconciledBytes);

compactionExecutor.submit(() -> {
try
{
long bytesAdded = result.bytesUsed + graphIndex.addGraphNode(result);
totalBytesAllocatedConcurrent.add(bytesAdded);
termSizeReservoir.update(bytesAdded);
long bytesAdded = graphIndex.addGraphNode(result);
maxBytesAddedObserved.accumulateAndGet(bytesAdded, Math::max);
// Store the difference between the estimated and actual bytes added for correction on the
// next call to addInternalAsync.
reconciliationBytes.add(bytesAdded - estimatedBytes);
}
catch (Throwable th)
{
Expand All @@ -318,15 +328,10 @@ protected long addInternalAsync(List<ByteBuffer> terms, int segmentRowId)
updatesInFlight.decrementAndGet();
}
});
// bytes allocated will be approximated immediately as the average of recently added terms,
// rather than waiting until the async update completes to get the exact value. The latter could
// result in a dangerously large discrepancy between the amount of memory actually consumed
// and the amount the limiter knows about if the queue depth grows.
busyWaitWhile(() -> termSizeReservoir.size() == 0 && asyncThrowable.get() == null);

if (asyncThrowable.get() != null) {
throw new RuntimeException("Error adding term asynchronously", asyncThrowable.get());
}
return (long) termSizeReservoir.getMean();
}

@Override
Expand All @@ -344,6 +349,12 @@ public boolean supportsAsyncAdd()
return true;
}

@Override
public void reconcileAsyncByteAllocations()
{
observeAllocatedBytes(reconciliationBytes.sumThenReset());
}

@Override
public boolean requiresFlush()
{
Expand All @@ -368,19 +379,21 @@ long release(IndexContext indexContext)
public static class VectorOnHeapSegmentBuilder extends SegmentBuilder
{
private final CassandraOnHeapGraph<Integer> graphIndex;
protected final AtomicLong maxBytesAddedObserved = new AtomicLong(0);
protected final LongAdder reconciliationBytes = new LongAdder();

public VectorOnHeapSegmentBuilder(IndexComponents.ForWrite components, long rowIdOffset, long keyCount, NamedMemoryLimiter limiter)
{
super(components, rowIdOffset, limiter);
graphIndex = new CassandraOnHeapGraph<>(components.context(), false, null);
totalBytesAllocated = graphIndex.ramBytesUsed();
totalBytesAllocatedConcurrent.add(totalBytesAllocated);
}

@Override
public boolean isEmpty()
{
return graphIndex.isEmpty();
// Must check updatesInFlight first to avoid a race with addInternalAsync and graphIndex.isEmpty().
return updatesInFlight.get() == 0 && graphIndex.isEmpty();
}

@Override
Expand All @@ -391,15 +404,21 @@ protected long addInternal(List<ByteBuffer> terms, int segmentRowId)
}

@Override
protected long addInternalAsync(List<ByteBuffer> terms, int segmentRowId)
protected void addInternalAsync(List<ByteBuffer> terms, int segmentRowId)
{
updatesInFlight.incrementAndGet();
// See VectorOffHeapSegmentBuilder for comments on this logic
long estimatedBytes = maxBytesAddedObserved.get() * 2;
long reconciledBytes = reconciliationBytes.sumThenReset();
observeAllocatedBytes(estimatedBytes + reconciledBytes);
compactionExecutor.submit(() -> {
try
{
long bytesAdded = addInternal(terms, segmentRowId);
totalBytesAllocatedConcurrent.add(bytesAdded);
termSizeReservoir.update(bytesAdded);
maxBytesAddedObserved.accumulateAndGet(bytesAdded, Math::max);
// Store the difference between the estimated and actual bytes added for correction on the
// next call to addInternalAsync.
reconciliationBytes.add(bytesAdded - estimatedBytes);
}
catch (Throwable th)
{
Expand All @@ -410,15 +429,9 @@ protected long addInternalAsync(List<ByteBuffer> terms, int segmentRowId)
updatesInFlight.decrementAndGet();
}
});
// bytes allocated will be approximated immediately as the average of recently added terms,
// rather than waiting until the async update completes to get the exact value. The latter could
// result in a dangerously large discrepancy between the amount of memory actually consumed
// and the amount the limiter knows about if the queue depth grows.
busyWaitWhile(() -> termSizeReservoir.size() == 0 && asyncThrowable.get() == null);
if (asyncThrowable.get() != null) {
throw new RuntimeException("Error adding term asynchronously", asyncThrowable.get());
}
return (long) termSizeReservoir.getMean();
}

@Override
Expand All @@ -437,6 +450,12 @@ public boolean supportsAsyncAdd()
{
return true;
}

@Override
public void reconcileAsyncByteAllocations()
{
observeAllocatedBytes(reconciliationBytes.sumThenReset());
}
}

private SegmentBuilder(IndexComponents.ForWrite components, long rowIdOffset, NamedMemoryLimiter limiter)
Expand All @@ -452,6 +471,15 @@ private SegmentBuilder(IndexComponents.ForWrite components, long rowIdOffset, Na
minimumFlushBytes = limiter.limitBytes() / ACTIVE_BUILDER_COUNT.getAndIncrement();
}

protected void observeAllocatedBytes(long bytes)
{
if (bytes != 0)
{
totalBytesAllocated += bytes;
limiter.increment(bytes);
}
}

public SegmentMetadata flush() throws IOException
{
assert !flushed;
Expand All @@ -474,27 +502,25 @@ public SegmentMetadata flush() throws IOException
return metadataBuilder.build();
}

public long analyzeAndAdd(ByteBuffer rawTerm, AbstractType<?> type, PrimaryKey key, long sstableRowId)
public void analyzeAndAdd(ByteBuffer rawTerm, AbstractType<?> type, PrimaryKey key, long sstableRowId)
{
long totalSize = 0;
if (TypeUtil.isLiteral(type))
{
var terms = ByteLimitedMaterializer.materializeTokens(analyzer, rawTerm, components.context(), key);
totalSize += add(terms, key, sstableRowId);
add(terms, key, sstableRowId);
totalTermCount += terms.size();
}
else
{
totalSize += add(List.of(rawTerm), key, sstableRowId);
add(List.of(rawTerm), key, sstableRowId);
totalTermCount++;
}
return totalSize;
}

private long add(List<ByteBuffer> terms, PrimaryKey key, long sstableRowId)
private void add(List<ByteBuffer> terms, PrimaryKey key, long sstableRowId)
{
if (terms.isEmpty())
return 0;
return;

Preconditions.checkState(!flushed, "Cannot add to flushed segment");
Preconditions.checkArgument(sstableRowId >= maxSSTableRowId,
Expand Down Expand Up @@ -527,23 +553,20 @@ private long add(List<ByteBuffer> terms, PrimaryKey key, long sstableRowId)

maxSegmentRowId = Math.max(maxSegmentRowId, segmentRowId);

long bytesAllocated;
if (supportsAsyncAdd())
{
// only vector indexing is done async and there can only be one term
assert terms.size() == 1;
bytesAllocated = addInternalAsync(terms, segmentRowId);
addInternalAsync(terms, segmentRowId);
}
else
{
bytesAllocated = addInternal(terms, segmentRowId);
long bytesAllocated = addInternal(terms, segmentRowId);
observeAllocatedBytes(bytesAllocated);
}

totalBytesAllocated += bytesAllocated;
return bytesAllocated;
}

protected long addInternalAsync(List<ByteBuffer> terms, int segmentRowId)
protected void addInternalAsync(List<ByteBuffer> terms, int segmentRowId)
{
throw new UnsupportedOperationException();
}
Expand All @@ -552,6 +575,10 @@ public boolean supportsAsyncAdd() {
return false;
}

protected void reconcileAsyncByteAllocations()
{
}

public Throwable getAsyncThrowable()
{
return asyncThrowable.get();
Expand All @@ -561,7 +588,8 @@ public void awaitAsyncAdditions()
{
// addTerm is only called by the compaction thread, serially, so we don't need to worry about new
// terms being added while we're waiting -- updatesInFlight can only decrease
busyWaitWhile(() -> updatesInFlight.get() > 0);
busyWaitWhile(() -> updatesInFlight.get() > 0, 60_000);
reconcileAsyncByteAllocations();
}

long totalBytesAllocated()
Expand Down
4 changes: 3 additions & 1 deletion src/java/org/apache/cassandra/utils/FBUtilities.java
Original file line number Diff line number Diff line change
Expand Up @@ -1396,10 +1396,12 @@ public static String getStackTrace(ThreadInfo threadInfo)
}
}

public static void busyWaitWhile(Supplier<Boolean> condition)
public static void busyWaitWhile(Supplier<Boolean> condition, int timeoutMs)
{
while (condition.get())
{
if (timeoutMs-- <= 0)
throw new RuntimeException("Timeout while waiting for condition");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have unit tests that cover this condition ?
a RuntimeException will bubble up and hit any place in the code

would it be better to return false and handle the timeout explicitly ?

try
{
Thread.sleep(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ private void testFlushBetweenRowIds(long sstableRowId1, long sstableRowId2, int
MockSchema.newCFS("ks"));

IndexComponents.ForWrite components = indexDescriptor.newPerIndexComponentsForWrite(indexContext);
long startingLimiterValue = V1OnDiskFormat.SEGMENT_BUILD_MEMORY_LIMITER.currentBytesUsed();
SSTableIndexWriter writer = new SSTableIndexWriter(components, V1OnDiskFormat.SEGMENT_BUILD_MEMORY_LIMITER, () -> false, () -> false, 2);

List<DecoratedKey> keys = Arrays.asList(dk("1"), dk("2"));
Expand All @@ -172,6 +173,7 @@ private void testFlushBetweenRowIds(long sstableRowId1, long sstableRowId2, int
writer.addRow(SAITester.TEST_FACTORY.create(key2, Clustering.EMPTY), row2, sstableRowId2);

writer.complete(Stopwatch.createStarted());
assertEquals(startingLimiterValue, V1OnDiskFormat.SEGMENT_BUILD_MEMORY_LIMITER.currentBytesUsed());

MetadataSource source = MetadataSource.loadMetadata(components);

Expand Down