Skip to content

Commit

Permalink
ORC-1525: Fix bad read in RleDecoderV2::readByte
Browse files Browse the repository at this point in the history
This PR aims to fix #1640 by resetting `BooleanRleEncoderImpl::current` and `BooleanRleEncoderImpl::bitsRemained` when suppress

As #1640 suppress no null present stream leaves dirty data of BooleanRleEncoderImpl::current and BooleanRleEncoderImpl::bitsRemained, which will be flush to next stripe's present stream if it has some null values.

I hava add a test testSuppressPresentStreamInPreStripe, which will construct a orc file with two stripe, the first stripe has no null value and seconds stripe has some null values. The constructed orc file writer have some dirty data in BooleanRleEncoderImpl for present stream. In the test I have add check for read ok and read result is same as write.

Closes #1640 .

Closes #1641 from hoffermei/present_supress_bugfix.

Lead-authored-by: hoffermei <meihaifeng.hust@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit 24beffb)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
  • Loading branch information
hoffermei authored and dongjoon-hyun committed Nov 1, 2023
1 parent 6322c1e commit 3487615
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 0 deletions.
8 changes: 8 additions & 0 deletions c++/src/ByteRLE.cc
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,8 @@ namespace orc {

virtual void recordPosition(PositionRecorder* recorder) const override;

virtual void suppress() override;

private:
int bitsRemained;
char current;
Expand Down Expand Up @@ -291,6 +293,12 @@ namespace orc {
recorder->add(static_cast<uint64_t>(8 - bitsRemained));
}

void BooleanRleEncoderImpl::suppress() {
ByteRleEncoderImpl::suppress();
bitsRemained = 8;
current = static_cast<char>(0);
}

std::unique_ptr<ByteRleEncoder> createBooleanRleEncoder(
std::unique_ptr<BufferedOutputStream> output) {
BooleanRleEncoderImpl* encoder = new BooleanRleEncoderImpl(std::move(output));
Expand Down
78 changes: 78 additions & 0 deletions c++/test/TestWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2113,6 +2113,84 @@ namespace orc {
EXPECT_FALSE(rowReader->next(*batch));
}

// first stripe has no null value and second stripe has null value.
// make sure stripes do not have dirty data in the present streams.
TEST_P(WriterTest, testSuppressPresentStreamInPreStripe) {
MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
MemoryPool* pool = getDefaultPool();

// [1-998000): notNull, value is equal to index
// [998000-999000): null
// [999000-1000000]: notNoll, value is equal to index
size_t rowCount = 1000000;
size_t nullBeginCount = 998000;
size_t nullEndCount = 999000;
size_t batchSize = 5;
{
auto type = std::unique_ptr<Type>(Type::buildTypeFromString("struct<col1:int>"));
WriterOptions options;
options.setStripeSize(16 * 1024)
.setCompressionBlockSize(1024)
.setCompression(CompressionKind_NONE)
.setMemoryPool(pool)
.setRowIndexStride(1000);

auto writer = createWriter(*type, &memStream, options);

uint64_t batchCount = rowCount / batchSize;
size_t rowsWrite = 0;
for (uint64_t batchIdx = 0; batchIdx < batchCount; batchIdx++) {
auto batch = writer->createRowBatch(batchSize);
auto& structBatch = dynamic_cast<StructVectorBatch&>(*batch);
auto& longBatch = dynamic_cast<LongVectorBatch&>(*structBatch.fields[0]);
structBatch.numElements = batchSize;
longBatch.numElements = batchSize;
longBatch.hasNulls = false;
for (uint64_t row = 0; row < batchSize; ++row) {
size_t rowIndex = rowsWrite + row + 1;
if (rowIndex < nullBeginCount || rowIndex >= nullEndCount) {
longBatch.data[row] = static_cast<int64_t>(rowIndex);
} else {
longBatch.notNull[row] = 0;
longBatch.hasNulls = true;
}
}

writer->add(*batch);
rowsWrite += batch->numElements;
}
writer->close();
}
// read file & check the column value correct
{
auto inStream =
std::make_unique<MemoryInputStream>(memStream.getData(), memStream.getLength());
ReaderOptions readerOptions;
readerOptions.setMemoryPool(*pool);
std::unique_ptr<Reader> reader = createReader(std::move(inStream), readerOptions);
EXPECT_EQ(reader->getNumberOfStripes(), 2);
EXPECT_EQ(rowCount, reader->getNumberOfRows());
std::unique_ptr<RowReader> rowReader = createRowReader(reader.get());
size_t rowsRead = 0;
while (rowsRead < rowCount) {
auto batch = rowReader->createRowBatch(1000);
EXPECT_TRUE(rowReader->next(*batch));
auto& structBatch = dynamic_cast<StructVectorBatch&>(*batch);
auto& longBatch = dynamic_cast<LongVectorBatch&>(*structBatch.fields[0]);
for (size_t i = 0; i < batch->numElements; ++i) {
size_t rowIndex = rowsRead + i + 1;
if (rowIndex < nullBeginCount || rowIndex >= nullEndCount) {
EXPECT_TRUE(longBatch.notNull[i]);
EXPECT_EQ(longBatch.data[i], static_cast<int64_t>(rowIndex));
} else {
EXPECT_FALSE(longBatch.notNull[i]);
}
}
rowsRead += batch->numElements;
}
}
}

INSTANTIATE_TEST_SUITE_P(OrcTest, WriterTest,
Values(FileVersion::v_0_11(), FileVersion::v_0_12(),
FileVersion::UNSTABLE_PRE_2_0()));
Expand Down

0 comments on commit 3487615

Please sign in to comment.