Skip to content

Commit

Permalink
add RealtimeSegmentConverter test for index reuse path
Browse files Browse the repository at this point in the history
  • Loading branch information
itschrispeck committed Apr 1, 2024
1 parent 9e91947 commit 5e0d5c2
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
Expand All @@ -38,6 +40,7 @@
import org.apache.pinot.segment.local.realtime.converter.RealtimeSegmentConverter;
import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory;
import org.apache.pinot.segment.local.realtime.impl.invertedindex.RealtimeLuceneTextIndexSearcherPool;
import org.apache.pinot.segment.local.segment.index.column.PhysicalColumnIndexContainer;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.segment.store.SegmentLocalFSDirectory;
Expand All @@ -46,9 +49,12 @@
import org.apache.pinot.segment.spi.creator.SegmentVersion;
import org.apache.pinot.segment.spi.index.DictionaryIndexConfig;
import org.apache.pinot.segment.spi.index.StandardIndexes;
import org.apache.pinot.segment.spi.index.TextIndexConfig;
import org.apache.pinot.segment.spi.index.column.ColumnIndexContainer;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.segment.spi.index.reader.TextIndexReader;
import org.apache.pinot.segment.spi.store.SegmentDirectory;
import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.config.table.IndexConfig;
import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.SegmentZKPropsConfig;
Expand All @@ -60,6 +66,8 @@
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.utils.ReadMode;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import static org.testng.Assert.assertEquals;
Expand Down Expand Up @@ -103,7 +111,7 @@ public void testNoRecordsIndexedRowMajorSegmentBuilder()
throws Exception {
File tmpDir = new File(TMP_DIR, "tmp_" + System.currentTimeMillis());
TableConfig tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTimeColumnName(DATE_TIME_COLUMN)
new TableConfigBuilder(TableType.REALTIME).setTableName("testTable").setTimeColumnName(DATE_TIME_COLUMN)
.setInvertedIndexColumns(Lists.newArrayList(STRING_COLUMN1)).setSortedColumn(LONG_COLUMN1)
.setRangeIndexColumns(Lists.newArrayList(STRING_COLUMN2))
.setNoDictionaryColumns(Lists.newArrayList(LONG_COLUMN2))
Expand Down Expand Up @@ -169,7 +177,7 @@ public void test10RecordsIndexedRowMajorSegmentBuilder()
throws Exception {
File tmpDir = new File(TMP_DIR, "tmp_" + System.currentTimeMillis());
TableConfig tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable")
new TableConfigBuilder(TableType.REALTIME).setTableName("testTable")
.setTimeColumnName(DATE_TIME_COLUMN)
.setInvertedIndexColumns(Lists.newArrayList(STRING_COLUMN1, LONG_COLUMN1))
.setSortedColumn(LONG_COLUMN1)
Expand Down Expand Up @@ -254,7 +262,7 @@ public void testNoRecordsIndexedColumnMajorSegmentBuilder()
throws Exception {
File tmpDir = new File(TMP_DIR, "tmp_" + System.currentTimeMillis());
TableConfig tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTimeColumnName(DATE_TIME_COLUMN)
new TableConfigBuilder(TableType.REALTIME).setTableName("testTable").setTimeColumnName(DATE_TIME_COLUMN)
.setInvertedIndexColumns(Lists.newArrayList(STRING_COLUMN1)).setSortedColumn(LONG_COLUMN1)
.setRangeIndexColumns(Lists.newArrayList(STRING_COLUMN2))
.setNoDictionaryColumns(Lists.newArrayList(LONG_COLUMN2))
Expand Down Expand Up @@ -321,7 +329,7 @@ public void test10RecordsIndexedColumnMajorSegmentBuilder()
throws Exception {
File tmpDir = new File(TMP_DIR, "tmp_" + System.currentTimeMillis());
TableConfig tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable")
new TableConfigBuilder(TableType.REALTIME).setTableName("testTable")
.setTimeColumnName(DATE_TIME_COLUMN)
.setInvertedIndexColumns(Lists.newArrayList(STRING_COLUMN1, LONG_COLUMN1))
.setSortedColumn(LONG_COLUMN1)
Expand Down Expand Up @@ -435,6 +443,130 @@ private void testSegment(List<GenericRow> rows, File indexDir,
}
}

@DataProvider
public static Object[][] reuseParams() {
List<Boolean> enabledColumnMajorSegmentBuildParams = Arrays.asList(false, true);
String[] sortedColumnParams = new String[]{null, STRING_COLUMN1};

return enabledColumnMajorSegmentBuildParams.stream().flatMap(
columnMajor -> Arrays.stream(sortedColumnParams).map(sortedColumn -> new Object[]{columnMajor,
sortedColumn}))
.toArray(Object[][]::new);
}

// Test the realtime segment conversion of a table with an index that reuses mutable index artifacts during conversion
@Test(dataProvider = "reuseParams")
public void testSegmentBuilderWithReuse(boolean columnMajorSegmentBuilder, String sortedColumn)
throws Exception {
File tmpDir = new File(TMP_DIR, "tmp_" + System.currentTimeMillis());
FieldConfig textIndexFieldConfig =
new FieldConfig.Builder(STRING_COLUMN1).withEncodingType(FieldConfig.EncodingType.RAW)
.withIndexTypes(Collections.singletonList(FieldConfig.IndexType.TEXT)).build();
List<FieldConfig> fieldConfigList = Collections.singletonList(textIndexFieldConfig);
TableConfig tableConfig =
new TableConfigBuilder(TableType.REALTIME).setTableName("testTable").setTimeColumnName(DATE_TIME_COLUMN)
.setInvertedIndexColumns(Lists.newArrayList(STRING_COLUMN1))
.setSortedColumn(sortedColumn).setColumnMajorSegmentBuilderEnabled(columnMajorSegmentBuilder)
.setFieldConfigList(fieldConfigList).build();
Schema schema = new Schema.SchemaBuilder().addSingleValueDimension(STRING_COLUMN1, FieldSpec.DataType.STRING)
.addDateTime(DATE_TIME_COLUMN, FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS").build();

String tableNameWithType = tableConfig.getTableName();
String segmentName = "testTable__0__0__123456";
IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
TextIndexConfig textIndexConfig =
new TextIndexConfig(false, null, null, false, false, Collections.emptyList(), Collections.emptyList(), false,
500, null, false);

RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder =
new RealtimeSegmentConfig.Builder().setTableNameWithType(tableNameWithType).setSegmentName(segmentName)
.setStreamName(tableNameWithType).setSchema(schema).setTimeColumnName(DATE_TIME_COLUMN).setCapacity(1000)
.setIndex(Sets.newHashSet(STRING_COLUMN1), StandardIndexes.inverted(), IndexConfig.ENABLED)
.setIndex(Sets.newHashSet(STRING_COLUMN1), StandardIndexes.text(), textIndexConfig)
.setFieldConfigList(fieldConfigList).setSegmentZKMetadata(getSegmentZKMetadata(segmentName))
.setOffHeap(true).setMemoryManager(new DirectMemoryManager(segmentName))
.setStatsHistory(RealtimeSegmentStatsHistory.deserialzeFrom(new File(tmpDir, "stats")))
.setConsumerDir(new File(tmpDir, "consumers").getAbsolutePath());

// create mutable segment impl
RealtimeLuceneTextIndexSearcherPool.init(1);
MutableSegmentImpl mutableSegmentImpl = new MutableSegmentImpl(realtimeSegmentConfigBuilder.build(), null);
List<GenericRow> rows = generateTestDataForReusePath();

for (GenericRow row : rows) {
mutableSegmentImpl.index(row, null);
}

// build converted segment
File outputDir = new File(new File(tmpDir, segmentName), "tmp-" + segmentName + "-" + System.currentTimeMillis());
SegmentZKPropsConfig segmentZKPropsConfig = new SegmentZKPropsConfig();
segmentZKPropsConfig.setStartOffset("1");
segmentZKPropsConfig.setEndOffset("100");
ColumnIndicesForRealtimeTable cdc = new ColumnIndicesForRealtimeTable(sortedColumn,
indexingConfig.getInvertedIndexColumns(), Collections.singletonList(STRING_COLUMN1), null,
indexingConfig.getNoDictionaryColumns(), indexingConfig.getVarLengthDictionaryColumns());
RealtimeSegmentConverter converter =
new RealtimeSegmentConverter(mutableSegmentImpl, segmentZKPropsConfig, outputDir.getAbsolutePath(), schema,
tableNameWithType, tableConfig, segmentName, cdc, false);
converter.build(SegmentVersion.v3, null);

File indexDir = new File(outputDir, segmentName);
SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(indexDir);
assertEquals(segmentMetadata.getVersion(), SegmentVersion.v3);
assertEquals(segmentMetadata.getTotalDocs(), rows.size());
assertEquals(segmentMetadata.getTimeColumn(), DATE_TIME_COLUMN);
assertEquals(segmentMetadata.getTimeUnit(), TimeUnit.MILLISECONDS);

long expectedStartTime = (long) rows.get(0).getValue(DATE_TIME_COLUMN);
assertEquals(segmentMetadata.getStartTime(), expectedStartTime);
long expectedEndTime = (long) rows.get(rows.size() - 1).getValue(DATE_TIME_COLUMN);
assertEquals(segmentMetadata.getEndTime(), expectedEndTime);

assertTrue(segmentMetadata.getAllColumns().containsAll(schema.getColumnNames()));
assertEquals(segmentMetadata.getStartOffset(), "1");
assertEquals(segmentMetadata.getEndOffset(), "100");

// read converted segment
SegmentLocalFSDirectory segmentDir = new SegmentLocalFSDirectory(indexDir, segmentMetadata, ReadMode.mmap);
SegmentDirectory.Reader segmentReader = segmentDir.createReader();

Map<String, ColumnIndexContainer> indexContainerMap = new HashMap<>();
Map<String, ColumnMetadata> columnMetadataMap = segmentMetadata.getColumnMetadataMap();
IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(null, tableConfig);
for (Map.Entry<String, ColumnMetadata> entry : columnMetadataMap.entrySet()) {
indexContainerMap.put(entry.getKey(),
new PhysicalColumnIndexContainer(segmentReader, entry.getValue(), indexLoadingConfig));
}
ImmutableSegmentImpl segmentFile = new ImmutableSegmentImpl(segmentDir, segmentMetadata, indexContainerMap, null);

// test forward index contents
GenericRow readRow = new GenericRow();
int docId = 0;
for (int i = 0; i < rows.size(); i++) {
GenericRow row;
if (sortedColumn == null) {
row = rows.get(i);
} else {
row = rows.get(rows.size() - i - 1);
}

segmentFile.getRecord(docId, readRow);
assertEquals(readRow.getValue(STRING_COLUMN1), row.getValue(STRING_COLUMN1));
assertEquals(readRow.getValue(DATE_TIME_COLUMN), row.getValue(DATE_TIME_COLUMN));
docId += 1;
}

// test docId conversion
TextIndexReader textIndexReader = segmentFile.getIndex(STRING_COLUMN1, StandardIndexes.text());
if (sortedColumn == null) {
assertEquals(textIndexReader.getDocIds("str-8"), ImmutableRoaringBitmap.bitmapOf(0));
assertEquals(textIndexReader.getDocIds("str-4"), ImmutableRoaringBitmap.bitmapOf(4));
} else {
assertEquals(textIndexReader.getDocIds("str-8"), ImmutableRoaringBitmap.bitmapOf(7));
assertEquals(textIndexReader.getDocIds("str-4"), ImmutableRoaringBitmap.bitmapOf(3));
}
}

private List<GenericRow> generateTestData() {
LinkedList<GenericRow> rows = new LinkedList<>();

Expand All @@ -459,6 +591,19 @@ private List<GenericRow> generateTestData() {
return rows;
}

private List<GenericRow> generateTestDataForReusePath() {
List<GenericRow> rows = new LinkedList<>();

for (int i = 0; i < 8; i++) {
GenericRow row = new GenericRow();
row.putValue(STRING_COLUMN1, "str" + (i - 8));
row.putValue(DATE_TIME_COLUMN, 1697814309L + i);
rows.add(row);
}

return rows;
}

private SegmentZKMetadata getSegmentZKMetadata(String segmentName) {
SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(segmentName);
segmentZKMetadata.setCreationTime(System.currentTimeMillis());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public LuceneTextIndexCreator(String column, File segmentIndexDir, boolean commi

if (_reuseMutableIndex) {
LOGGER.info("Reusing the realtime lucene index for segment {} and column {}", segmentIndexDir, column);
indexWriterConfig.setOpenMode(IndexWriterConfig.OpenMode.APPEND);
indexWriterConfig.setOpenMode(IndexWriterConfig.OpenMode.CREATE_OR_APPEND);
convertMutableSegment(segmentIndexDir, sortedDocIds, indexWriterConfig);
return;
}
Expand Down

0 comments on commit 5e0d5c2

Please sign in to comment.