Skip to content

Commit 9a71175

Browse files
committed
HCD-209: Fix queries on early-open BTI files
Change updateFileHandle to also take care of the file length Also includes fix for CASSANDRA-20976
1 parent 8277624 commit 9a71175

File tree

14 files changed

+286
-35
lines changed

14 files changed

+286
-35
lines changed

CHANGES.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ Future version (tbd)
33
Merged from 5.1:
44
* Expose current compaction throughput in nodetool (CASSANDRA-13890)
55
Merged from 5.0:
6+
* Fix range queries on early-open BTI files (CASSANDRA-20976)
67
* Improve error messages when initializing auth classes (CASSANDRA-20368 and CASSANDRA-20450)
78
* Use ParameterizedClass for all auth-related implementations (CASSANDRA-19946 and partially CASSANDRA-18554)
89
* Enables IAuthenticator's to return own AuthenticateMessage (CASSANDRA-19984)

src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -221,11 +221,14 @@ protected void flushData()
221221
runPostFlush.run();
222222
}
223223

224-
public void updateFileHandle(FileHandle.Builder fhBuilder, long dataLength)
224+
public FileHandle.Builder updateFileHandle(FileHandle.Builder fhBuilder, long dataLength)
225225
{
226226
long length = dataLength > 0 ? dataLength : lastFlushOffset;
227227
if (length > 0)
228-
fhBuilder.withCompressionMetadata(metadataWriter.open(length, chunkOffset));
228+
return fhBuilder.withLength(-1) // get length from compression metadata
229+
.withCompressionMetadata(metadataWriter.open(length, chunkOffset));
230+
else
231+
return fhBuilder.withLength(length); // 0 or -1
229232
}
230233

231234
@Override

src/java/org/apache/cassandra/io/compress/EncryptedSequentialWriter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -178,10 +178,10 @@ protected void resetBuffer()
178178
buffer.clear();
179179
}
180180

181-
public void updateFileHandle(FileHandle.Builder fhBuilder, long dataLength)
181+
public FileHandle.Builder updateFileHandle(FileHandle.Builder fhBuilder, long dataLength)
182182
{
183183
// Set length to last content position to avoid having to read and decrypt the last chunk to find it.
184-
fhBuilder.withLength(lastContent);
184+
return fhBuilder.withLength(lastContent);
185185
}
186186

187187
@Override

src/java/org/apache/cassandra/io/sstable/format/SortedTableWriter.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -358,9 +358,9 @@ protected SSTableReader openFinal(SSTableReader.OpenReason reason, StorageHandle
358358
// need to move this inside the `try` so that the `storageHandler` callback can intercept reading issues.
359359
// Which would imply being able to get at the compressed/uncompressed sizes upfront (directly from the
360360
// writer, without reading the compression metadata written file) in some other way.
361-
dataFile.updateFileHandle(dbuilder);
362-
363-
FileHandle dfile = dbuilder.bufferSize(dataBufferSize).complete();
361+
FileHandle dfile = dataFile.updateFileHandle(dbuilder)
362+
.bufferSize(dataBufferSize)
363+
.complete();
364364
invalidateCacheAtPreviousBoundary(dfile, Long.MAX_VALUE);
365365

366366
DecoratedKey firstMinimized = getMinimalKey(first);

src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -204,11 +204,15 @@ public boolean openEarly(Consumer<SSTableReader> callWhenReady)
204204
IndexSummary indexSummary = iwriter.summary.build(metadata().partitioner, boundary);
205205
long indexFileLength = descriptor.fileFor(Component.PRIMARY_INDEX).length();
206206
int indexBufferSize = optimizationStrategy.bufferSize(indexFileLength / indexSummary.size());
207-
iwriter.indexFile.updateFileHandle(iwriter.builder);
208-
FileHandle ifile = iwriter.builder.bufferSize(indexBufferSize).withLength(boundary.indexLength).complete();
209-
dataFile.updateFileHandle(dbuilder, boundary.dataLength);
207+
FileHandle ifile = iwriter.indexFile.updateFileHandle(iwriter.builder)
208+
.bufferSize(indexBufferSize)
209+
.withLength(boundary.indexLength)
210+
.complete();
210211
int dataBufferSize = optimizationStrategy.bufferSize(stats.estimatedPartitionSize.percentile(DatabaseDescriptor.getDiskOptimizationEstimatePercentile()));
211-
FileHandle dfile = dbuilder.bufferSize(dataBufferSize).withLength(boundary.dataLength).complete();
212+
FileHandle dfile = dataFile.updateFileHandle(dbuilder, boundary.dataLength)
213+
.bufferSize(dataBufferSize)
214+
.withLength(boundary.dataLength)
215+
.complete();
212216
invalidateCacheAtPreviousBoundary(dfile, boundary.dataLength);
213217
SSTableReader sstable = BigTableReader.internalOpen(descriptor,
214218
components(), metadata,
@@ -244,8 +248,7 @@ protected SSTableReader openReader(SSTableReader.OpenReason reason, FileHandle d
244248
IndexSummary indexSummary = iwriter.summary.build(metadata().partitioner);
245249
long indexFileLength = descriptor.fileFor(Component.PRIMARY_INDEX).length();
246250
int indexBufferSize = optimizationStrategy.bufferSize(indexFileLength / indexSummary.size());
247-
iwriter.indexFile.updateFileHandle(iwriter.builder);
248-
FileHandle ifile = iwriter.builder.bufferSize(indexBufferSize).complete();
251+
FileHandle ifile = iwriter.indexFile.updateFileHandle(iwriter.builder).bufferSize(indexBufferSize).complete();
249252
return SSTableReader.internalOpen(descriptor,
250253
components(),
251254
metadata,

src/java/org/apache/cassandra/io/sstable/format/trieindex/PartitionIndex.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public PartitionIndex(FileHandle fh, long trieRoot, long keyCount, DecoratedKey
9494
this.version = version;
9595
}
9696

97-
private PartitionIndex(PartitionIndex src)
97+
protected PartitionIndex(PartitionIndex src)
9898
{
9999
this(src.fh, src.root, src.keyCount, src.first, src.last, src.filterFirst, src.filterLast, src.version);
100100
}

src/java/org/apache/cassandra/io/sstable/format/trieindex/PartitionIndexBuilder.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -110,8 +110,7 @@ private void refreshReadableBoundary()
110110
if (partitionIndexSyncPosition < partialIndexPartitionEnd)
111111
return;
112112

113-
writer.updateFileHandle(fhBuilder);
114-
try (FileHandle fh = fhBuilder.withLength(writer.getLastFlushOffset()).complete())
113+
try (FileHandle fh = writer.updateFileHandle(fhBuilder).complete())
115114
{
116115
PartitionIndex pi = new PartitionIndexEarly(fh,
117116
partialIndexTail.root(),
@@ -187,7 +186,6 @@ public long complete() throws IOException
187186
writer.writeLong(root);
188187

189188
writer.sync();
190-
fhBuilder.withLength(writer.getLastFlushOffset());
191189
writer.updateFileHandle(fhBuilder);
192190

193191
return root;

src/java/org/apache/cassandra/io/sstable/format/trieindex/PartitionIndexEarly.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,19 @@ public PartitionIndexEarly(FileHandle fh, long trieRoot, long keyCount, Decorate
4343
this.tail = tail;
4444
}
4545

46+
protected PartitionIndexEarly(PartitionIndexEarly partitionIndexEarly)
47+
{
48+
super(partitionIndexEarly);
49+
this.cutoff = partitionIndexEarly.cutoff;
50+
this.tail = partitionIndexEarly.tail;
51+
}
52+
53+
@Override
54+
public PartitionIndex sharedCopy()
55+
{
56+
return new PartitionIndexEarly(this);
57+
}
58+
4659
@Override
4760
protected Rebufferer instantiateRebufferer()
4861
{

src/java/org/apache/cassandra/io/sstable/format/trieindex/PartitionIterator.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,13 +64,21 @@ class PartitionIterator extends PartitionIndex.IndexPosIterator implements Parti
6464
this.rowIndexFile = rowIndexFile;
6565
this.dataFile = dataFile;
6666

67-
readNext();
68-
// first value can be off
69-
if (nextKey != null && !(nextKey.compareTo(left) > inclusiveLeft))
67+
try
7068
{
7169
readNext();
70+
// first value can be off
71+
if (nextKey != null && !(nextKey.compareTo(left) > inclusiveLeft))
72+
{
73+
readNext();
74+
}
75+
advance();
76+
}
77+
catch (Throwable t)
78+
{
79+
super.close();
80+
throw t;
7281
}
73-
advance();
7482
}
7583

7684
/**

src/java/org/apache/cassandra/io/sstable/format/trieindex/TrieIndexSSTableWriter.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -204,12 +204,13 @@ public boolean openEarly(Consumer<SSTableReader> callWhenReady)
204204
StatsMetadata stats = (StatsMetadata) finalMetadata.get(MetadataType.STATS);
205205
CompactionMetadata compactionMetadata = (CompactionMetadata) finalMetadata.get(MetadataType.COMPACTION);
206206

207-
FileHandle ifile = iwriter.rowIndexFHBuilder.withLength(iwriter.rowIndexFile.getLastFlushOffset()).complete();
208-
// With trie indices it is no longer necessary to limit the file size; just make sure indices and data
209-
// get updated length / compression metadata.
210-
dataFile.updateFileHandle(dbuilder, dataLength);
207+
FileHandle ifile = iwriter.rowIndexFile.updateFileHandle(iwriter.rowIndexFHBuilder)
208+
.complete();
211209
int dataBufferSize = optimizationStrategy.bufferSize(stats.estimatedPartitionSize.percentile(DatabaseDescriptor.getDiskOptimizationEstimatePercentile()));
212-
FileHandle dfile = dbuilder.bufferSize(dataBufferSize).withLength(dataLength).complete();
210+
FileHandle dfile = dataFile.updateFileHandle(dbuilder, dataLength)
211+
.bufferSize(dataBufferSize)
212+
.withLength(dataLength)
213+
.complete();
213214
invalidateCacheAtPreviousBoundary(dfile, dataLength);
214215
SSTableReader sstable = TrieIndexSSTableReader.internalOpen(descriptor,
215216
components(), metadata,
@@ -232,6 +233,7 @@ public SSTableReader openFinalEarly()
232233
// ensure outstanding openEarly actions are not triggered.
233234
dataFile.sync();
234235
iwriter.rowIndexFile.sync();
236+
iwriter.rowIndexFile.updateFileHandle(iwriter.rowIndexFHBuilder);
235237
// Note: Nothing must be written to any of the files after this point, as the chunk cache could pick up and
236238
// retain a partially-written page (see DB-2446).
237239

@@ -397,7 +399,6 @@ public long append(DecoratedKey key, RowIndexEntry indexEntry) throws IOExceptio
397399

398400
public boolean buildPartial(long dataPosition, Consumer<PartitionIndex> callWhenReady)
399401
{
400-
rowIndexFile.updateFileHandle(rowIndexFHBuilder);
401402
return partitionIndex.buildPartial(callWhenReady, rowIndexFile.position(), dataPosition);
402403
}
403404

@@ -450,8 +451,6 @@ protected void doPrepare()
450451

451452
// truncate index file
452453
rowIndexFile.prepareToCommit();
453-
rowIndexFHBuilder.withLength(rowIndexFile.getLastFlushOffset());
454-
//TODO figure out whether the update should be done before or after the prepare to commit
455454
rowIndexFile.updateFileHandle(rowIndexFHBuilder);
456455

457456
complete();

0 commit comments

Comments
 (0)