diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DataDriver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DataDriver.java index ee4a0ad6d416..a225793437f8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DataDriver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DataDriver.java @@ -23,7 +23,7 @@ import org.apache.iotdb.db.queryengine.execution.operator.Operator; import org.apache.iotdb.db.queryengine.execution.operator.source.DataSourceOperator; import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance; -import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource; +import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource; import com.google.common.util.concurrent.SettableFuture; @@ -68,13 +68,15 @@ protected boolean init(SettableFuture blockedFuture) { } /** - * Init seq file list and unseq file list in {@link QueryDataSource} and set it into each + * Init seq file list and unseq file list in {@link + * org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource} and set it into each * SourceNode. * * @throws QueryProcessException while failed to init query resource, QueryProcessException will * be thrown - * @throws IllegalStateException if {@link QueryDataSource} is null after initialization, - * IllegalStateException will be thrown + * @throws IllegalStateException if {@link + * org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource} is null after + * initialization, IllegalStateException will be thrown */ private void initialize() throws QueryProcessException { long startTime = System.nanoTime(); @@ -82,7 +84,7 @@ private void initialize() throws QueryProcessException { List sourceOperators = ((DataDriverContext) driverContext).getSourceOperators(); if (sourceOperators != null && !sourceOperators.isEmpty()) { - QueryDataSource dataSource = initQueryDataSource(); + IQueryDataSource dataSource = initQueryDataSource(); if (dataSource == null) { // If this driver is being initialized, meanwhile the whole FI was aborted or cancelled // for some reasons, we may get null QueryDataSource here. @@ -92,14 +94,7 @@ private void initialize() throws QueryProcessException { sourceOperators.forEach( sourceOperator -> { // Construct QueryDataSource for source operator - QueryDataSource queryDataSource = - new QueryDataSource(dataSource.getSeqResources(), dataSource.getUnseqResources()); - - queryDataSource.setSingleDevice(dataSource.isSingleDevice()); - - queryDataSource.setDataTTL(dataSource.getDataTTL()); - - sourceOperator.initQueryDataSource(queryDataSource); + sourceOperator.initQueryDataSource(dataSource.clone()); }); } @@ -118,12 +113,12 @@ protected void releaseResource() { /** * The method is called in mergeLock() when executing query. This method will get all the {@link - * QueryDataSource} needed for this query. + * org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource} needed for this query. * * @throws QueryProcessException while failed to init query resource, QueryProcessException will * be thrown */ - private QueryDataSource initQueryDataSource() throws QueryProcessException { + private IQueryDataSource initQueryDataSource() throws QueryProcessException { return ((DataDriverContext) driverContext).getSharedQueryDataSource(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DataDriverContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DataDriverContext.java index c51c02c08715..335133b19072 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DataDriverContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DataDriverContext.java @@ -24,7 +24,7 @@ import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext; import org.apache.iotdb.db.queryengine.execution.operator.source.DataSourceOperator; import org.apache.iotdb.db.storageengine.dataregion.IDataRegionForQuery; -import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource; +import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource; import java.util.ArrayList; import java.util.List; @@ -69,7 +69,7 @@ public IDataRegionForQuery getDataRegion() { return getFragmentInstanceContext().getDataRegion(); } - public QueryDataSource getSharedQueryDataSource() throws QueryProcessException { + public IQueryDataSource getSharedQueryDataSource() throws QueryProcessException { return getFragmentInstanceContext().getSharedQueryDataSource(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java index 3d72278cc13e..7ec5503dda80 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java @@ -29,11 +29,15 @@ import org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet; import org.apache.iotdb.db.queryengine.plan.planner.plan.TimePredicate; import org.apache.iotdb.db.storageengine.dataregion.IDataRegionForQuery; +import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource; import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource; +import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSourceForRegionScan; +import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSourceType; import org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceStatisticsResp; +import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.read.filter.basic.Filter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,7 +45,6 @@ import java.time.ZoneId; import java.util.ArrayList; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; @@ -64,8 +67,11 @@ public class FragmentInstanceContext extends QueryContext { // it will only be used once, after sharedQueryDataSource being inited, it will be set to null private List sourcePaths; + // Used for region scan, relating methods are to be added. + private Map devicePathsToAligned; + // Shared by all scan operators in this fragment instance to avoid memory problem - private QueryDataSource sharedQueryDataSource; + private IQueryDataSource sharedQueryDataSource; /** closed tsfile used in this fragment instance. */ private Set closedFilePaths; @@ -80,6 +86,8 @@ public class FragmentInstanceContext extends QueryContext { // empty for zero time partitions private List timePartitions; + private QueryDataSourceType queryDataSourceType = QueryDataSourceType.SERIES_SCAN; + private final AtomicLong startNanos = new AtomicLong(); private final AtomicLong endNanos = new AtomicLong(); @@ -153,6 +161,10 @@ public static FragmentInstanceContext createFragmentInstanceContextForCompaction return new FragmentInstanceContext(queryId); } + public void setQueryDataSourceType(QueryDataSourceType queryDataSourceType) { + this.queryDataSourceType = queryDataSourceType; + } + @TestOnly public static FragmentInstanceContext createFragmentInstanceContext( FragmentInstanceId id, FragmentInstanceStateMachine stateMachine) { @@ -374,8 +386,31 @@ public void initQueryDataSource(List sourcePaths) throws QueryProce if (sharedQueryDataSource != null) { closedFilePaths = new HashSet<>(); unClosedFilePaths = new HashSet<>(); - addUsedFilesForQuery(sharedQueryDataSource); - sharedQueryDataSource.setSingleDevice(selectedDeviceIdSet.size() == 1); + addUsedFilesForQuery((QueryDataSource) sharedQueryDataSource); + ((QueryDataSource) sharedQueryDataSource).setSingleDevice(selectedDeviceIdSet.size() == 1); + } + } finally { + setInitQueryDataSourceCost(System.nanoTime() - startTime); + dataRegion.readUnlock(); + } + } + + public void initRegionScanQueryDataSource(Map devicePathToAligned) + throws QueryProcessException { + long startTime = System.nanoTime(); + dataRegion.readLock(); + try { + this.sharedQueryDataSource = + dataRegion.queryForDeviceRegionScan( + devicePathToAligned, + this, + globalTimeFilter != null ? globalTimeFilter.copy() : null, + timePartitions); + + if (sharedQueryDataSource != null) { + closedFilePaths = new HashSet<>(); + unClosedFilePaths = new HashSet<>(); + addUsedFilesForRegionQuery((QueryDataSourceForRegionScan) sharedQueryDataSource); } } finally { setInitQueryDataSourceCost(System.nanoTime() - startTime); @@ -383,50 +418,111 @@ public void initQueryDataSource(List sourcePaths) throws QueryProce } } - public synchronized QueryDataSource getSharedQueryDataSource() throws QueryProcessException { + public void initRegionScanQueryDataSource(List pathList) + throws QueryProcessException { + long startTime = System.nanoTime(); + dataRegion.readLock(); + try { + this.sharedQueryDataSource = + dataRegion.queryForSeriesRegionScan( + pathList, + this, + globalTimeFilter != null ? globalTimeFilter.copy() : null, + timePartitions); + + if (sharedQueryDataSource != null) { + closedFilePaths = new HashSet<>(); + unClosedFilePaths = new HashSet<>(); + addUsedFilesForRegionQuery((QueryDataSourceForRegionScan) sharedQueryDataSource); + } + } finally { + setInitQueryDataSourceCost(System.nanoTime() - startTime); + dataRegion.readUnlock(); + } + } + + public synchronized IQueryDataSource getSharedQueryDataSource() throws QueryProcessException { if (sharedQueryDataSource == null) { - initQueryDataSource(sourcePaths); - // friendly for gc - sourcePaths = null; + switch (queryDataSourceType) { + case SERIES_SCAN: + initQueryDataSource(sourcePaths); + // Friendly for gc + sourcePaths = null; + break; + case DEVICE_REGION_SCAN: + initRegionScanQueryDataSource(devicePathsToAligned); + devicePathsToAligned = null; + break; + case TIME_SERIES_REGION_SCAN: + initRegionScanQueryDataSource(sourcePaths); + sourcePaths = null; + break; + default: + throw new QueryProcessException( + "Unsupported query data source type: " + queryDataSourceType); + } } return sharedQueryDataSource; } + /** Lock and check if tsFileResource is deleted */ + private boolean processTsFileResource(TsFileResource tsFileResource, boolean isClosed) { + addFilePathToMap(tsFileResource, isClosed); + // this file may be deleted just before we lock it + if (tsFileResource.isDeleted()) { + Set pathSet = isClosed ? closedFilePaths : unClosedFilePaths; + // This resource may be removed by other threads of this query. + if (pathSet.remove(tsFileResource)) { + FileReaderManager.getInstance().decreaseFileReaderReference(tsFileResource, isClosed); + } + return true; + } else { + return false; + } + } + /** Add the unique file paths to closeddFilePathsMap and unClosedFilePathsMap. */ private void addUsedFilesForQuery(QueryDataSource dataSource) { // sequence data - addUsedFilesForQuery(dataSource.getSeqResources()); + dataSource + .getSeqResources() + .removeIf( + tsFileResource -> processTsFileResource(tsFileResource, tsFileResource.isClosed())); // Record statistics of seqFiles unclosedSeqFileNum = unClosedFilePaths.size(); closedSeqFileNum = closedFilePaths.size(); // unsequence data - addUsedFilesForQuery(dataSource.getUnseqResources()); + dataSource + .getUnseqResources() + .removeIf( + tsFileResource -> processTsFileResource(tsFileResource, tsFileResource.isClosed())); // Record statistics of files of unseqFiles unclosedUnseqFileNum = unClosedFilePaths.size() - unclosedSeqFileNum; closedUnseqFileNum = closedFilePaths.size() - closedSeqFileNum; } - private void addUsedFilesForQuery(List resources) { - Iterator iterator = resources.iterator(); - while (iterator.hasNext()) { - TsFileResource tsFileResource = iterator.next(); - boolean isClosed = tsFileResource.isClosed(); - addFilePathToMap(tsFileResource, isClosed); - - // this file may be deleted just before we lock it - if (tsFileResource.isDeleted()) { - Set pathSet = isClosed ? closedFilePaths : unClosedFilePaths; - // This resource may be removed by other threads of this query. - if (pathSet.remove(tsFileResource)) { - FileReaderManager.getInstance().decreaseFileReaderReference(tsFileResource, isClosed); - } - iterator.remove(); - } - } + private void addUsedFilesForRegionQuery(QueryDataSourceForRegionScan dataSource) { + dataSource + .getSeqFileScanHandles() + .removeIf( + fileScanHandle -> + processTsFileResource(fileScanHandle.getTsResource(), fileScanHandle.isClosed())); + + unclosedSeqFileNum = unClosedFilePaths.size(); + closedSeqFileNum = closedFilePaths.size(); + + dataSource + .getUnseqFileScanHandles() + .removeIf( + fileScanHandle -> + processTsFileResource(fileScanHandle.getTsResource(), fileScanHandle.isClosed())); + + unclosedUnseqFileNum = unClosedFilePaths.size() - unclosedSeqFileNum; + closedUnseqFileNum = closedFilePaths.size() - closedSeqFileNum; } /** diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java index e774cd6de982..a9b5c2991808 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.queryengine.execution.fragment; +import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.path.AlignedPath; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.path.PatternTreeMap; @@ -30,6 +31,7 @@ import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory.ModsSerializer; import org.apache.tsfile.file.metadata.IChunkMetadata; +import org.apache.tsfile.file.metadata.IDeviceID; import java.util.ArrayList; import java.util.Collections; @@ -79,34 +81,70 @@ public QueryContext(long queryId, boolean debug, long startTime, long timeout) { this.timeout = timeout; } - /** - * Find the modifications of timeseries 'path' in 'modFile'. If they are not in the cache, read - * them from 'modFile' and put then into the cache. - */ - public List getPathModifications(TsFileResource tsFileResource, PartialPath path) { - // if the mods file does not exist, do not add it to the cache + private boolean checkIfModificationExists(TsFileResource tsFileResource) { if (nonExistentModFiles.contains(tsFileResource.getTsFileID())) { - return Collections.emptyList(); + return false; } ModificationFile modFile = tsFileResource.getModFile(); if (!modFile.exists()) { nonExistentModFiles.add(tsFileResource.getTsFileID()); + return false; + } + return true; + } + + private PatternTreeMap getAllModifications( + ModificationFile modFile) { + return fileModCache.computeIfAbsent( + modFile.getFilePath(), + k -> { + PatternTreeMap modifications = + PatternTreeMapFactory.getModsPatternTreeMap(); + for (Modification modification : modFile.getModificationsIter()) { + modifications.append(modification.getPath(), modification); + } + return modifications; + }); + } + + public List getPathModifications( + TsFileResource tsFileResource, IDeviceID deviceID, String measurement) + throws IllegalPathException { + // if the mods file does not exist, do not add it to the cache + if (!checkIfModificationExists(tsFileResource)) { + return Collections.emptyList(); + } + + return ModificationFile.sortAndMerge( + getAllModifications(tsFileResource.getModFile()) + .getOverlapped(new PartialPath(deviceID, measurement))); + } + + public List getPathModifications(TsFileResource tsFileResource, IDeviceID deviceID) + throws IllegalPathException { + // if the mods file does not exist, do not add it to the cache + if (!checkIfModificationExists(tsFileResource)) { + return Collections.emptyList(); + } + + return ModificationFile.sortAndMerge( + getAllModifications(tsFileResource.getModFile()) + .getDeviceOverlapped(new PartialPath(deviceID))); + } + + /** + * Find the modifications of timeseries 'path' in 'modFile'. If they are not in the cache, read + * them from 'modFile' and put then into the cache. + */ + public List getPathModifications(TsFileResource tsFileResource, PartialPath path) { + // if the mods file does not exist, do not add it to the cache + if (!checkIfModificationExists(tsFileResource)) { return Collections.emptyList(); } - PatternTreeMap allModifications = - fileModCache.computeIfAbsent( - modFile.getFilePath(), - k -> { - PatternTreeMap modifications = - PatternTreeMapFactory.getModsPatternTreeMap(); - for (Modification modification : modFile.getModificationsIter()) { - modifications.append(modification.getPath(), modification); - } - return modifications; - }); - return ModificationFile.sortAndMerge(allModifications.getOverlapped(path)); + return ModificationFile.sortAndMerge( + getAllModifications(tsFileResource.getModFile()).getOverlapped(path)); } /** diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractDataSourceOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractDataSourceOperator.java index 1d257eef50a4..47fe0acbf877 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractDataSourceOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractDataSourceOperator.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.queryengine.execution.operator.source; +import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource; import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource; import org.apache.tsfile.enums.TSDataType; @@ -34,8 +35,8 @@ public abstract class AbstractDataSourceOperator extends AbstractSourceOperator protected TsBlockBuilder resultTsBlockBuilder; @Override - public void initQueryDataSource(QueryDataSource dataSource) { - seriesScanUtil.initQueryDataSource(dataSource); + public void initQueryDataSource(IQueryDataSource dataSource) { + seriesScanUtil.initQueryDataSource((QueryDataSource) dataSource); resultTsBlockBuilder = new TsBlockBuilder(getResultDataTypes()); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanOperator.java index be4d3f439317..22cb94b9ec19 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanOperator.java @@ -25,6 +25,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions; import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; +import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource; import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource; import org.apache.tsfile.block.column.Column; @@ -224,8 +225,8 @@ protected List getResultDataTypes() { } @Override - public void initQueryDataSource(QueryDataSource dataSource) { - seriesScanUtil.initQueryDataSource(dataSource); + public void initQueryDataSource(IQueryDataSource dataSource) { + seriesScanUtil.initQueryDataSource((QueryDataSource) dataSource); resultTsBlockBuilder = new TsBlockBuilder(getResultDataTypes()); resultTsBlockBuilder.setMaxTsBlockLineNumber(this.maxTsBlockLineNum); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/DataSourceOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/DataSourceOperator.java index 3cecdd3a8a87..ec03d19a69bb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/DataSourceOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/DataSourceOperator.java @@ -19,9 +19,9 @@ package org.apache.iotdb.db.queryengine.execution.operator.source; -import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource; +import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource; public interface DataSourceOperator extends SourceOperator { - void initQueryDataSource(QueryDataSource dataSource); + void initQueryDataSource(IQueryDataSource dataSource); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java index 36ba50ef6950..812edc6b43bb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java @@ -31,7 +31,6 @@ import org.apache.iotdb.db.storageengine.dataregion.memtable.IWritableMemChunk; import org.apache.iotdb.db.storageengine.dataregion.memtable.IWritableMemChunkGroup; import org.apache.iotdb.db.storageengine.dataregion.memtable.ReadOnlyMemChunk; -import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion; import org.apache.iotdb.db.storageengine.dataregion.modification.Modification; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.utils.ModificationUtils; @@ -90,20 +89,6 @@ public abstract ReadOnlyMemChunk getReadOnlyMemChunkFromMemTable( public abstract List getVisibleMetadataListFromWriter( RestorableTsFileIOWriter writer, TsFileResource tsFileResource, QueryContext context); - - /** get modifications from a memtable. */ - protected List getModificationsForMemtable( - IMemTable memTable, List> modsToMemtable) { - List modifications = new ArrayList<>(); - boolean foundMemtable = false; - for (Pair entry : modsToMemtable) { - if (foundMemtable || entry.right.equals(memTable)) { - modifications.add(entry.left); - foundMemtable = true; - } - } - return modifications; - } } class AlignedResourceByPathUtils extends ResourceByPathUtils { @@ -218,7 +203,9 @@ public ReadOnlyMemChunk getReadOnlyMemChunkFromMemTable( TVList alignedTvListCopy = alignedMemChunk.getSortedTvListForQuery(partialPath.getSchemaList()); List> deletionList = null; if (modsToMemtable != null) { - deletionList = constructDeletionList(memTable, modsToMemtable, timeLowerBound); + deletionList = + ModificationUtils.constructDeletionList( + partialPath, memTable, modsToMemtable, timeLowerBound); } return new AlignedReadOnlyMemChunk( context, getMeasurementSchema(), alignedTvListCopy, deletionList); @@ -245,28 +232,6 @@ public VectorMeasurementSchema getMeasurementSchema() { partialPath.getSchemaList().get(0).getCompressor()); } - private List> constructDeletionList( - IMemTable memTable, List> modsToMemtable, long timeLowerBound) { - List> deletionList = new ArrayList<>(); - for (String measurement : partialPath.getMeasurementList()) { - List columnDeletionList = new ArrayList<>(); - columnDeletionList.add(new TimeRange(Long.MIN_VALUE, timeLowerBound)); - for (Modification modification : getModificationsForMemtable(memTable, modsToMemtable)) { - if (modification instanceof Deletion) { - Deletion deletion = (Deletion) modification; - PartialPath fullPath = partialPath.concatNode(measurement); - if (deletion.getPath().matchFullPath(fullPath) - && deletion.getEndTime() > timeLowerBound) { - long lowerBound = Math.max(deletion.getStartTime(), timeLowerBound); - columnDeletionList.add(new TimeRange(lowerBound, deletion.getEndTime())); - } - } - } - deletionList.add(TimeRange.sortAndMerge(columnDeletionList)); - } - return deletionList; - } - @Override public List getVisibleMetadataListFromWriter( RestorableTsFileIOWriter writer, TsFileResource tsFileResource, QueryContext context) { @@ -371,7 +336,9 @@ public ReadOnlyMemChunk getReadOnlyMemChunkFromMemTable( TVList chunkCopy = memChunk.getSortedTvListForQuery(); List deletionList = null; if (modsToMemtable != null) { - deletionList = constructDeletionList(memTable, modsToMemtable, timeLowerBound); + deletionList = + ModificationUtils.constructDeletionList( + partialPath, memTable, modsToMemtable, timeLowerBound); } return new ReadOnlyMemChunk( context, @@ -383,44 +350,6 @@ public ReadOnlyMemChunk getReadOnlyMemChunkFromMemTable( deletionList); } - /** - * construct a deletion list from a memtable. - * - * @param memTable memtable - * @param timeLowerBound time watermark - */ - private List constructDeletionList( - IMemTable memTable, List> modsToMemtable, long timeLowerBound) { - List deletionList = new ArrayList<>(); - deletionList.add(new TimeRange(Long.MIN_VALUE, timeLowerBound)); - for (Modification modification : getModificationsForMemtable(memTable, modsToMemtable)) { - if (modification instanceof Deletion) { - Deletion deletion = (Deletion) modification; - if (deletion.getPath().matchFullPath(partialPath) - && deletion.getEndTime() > timeLowerBound) { - long lowerBound = Math.max(deletion.getStartTime(), timeLowerBound); - deletionList.add(new TimeRange(lowerBound, deletion.getEndTime())); - } - } - } - return TimeRange.sortAndMerge(deletionList); - } - - /** get modifications from a memtable. */ - @Override - protected List getModificationsForMemtable( - IMemTable memTable, List> modsToMemtable) { - List modifications = new ArrayList<>(); - boolean foundMemtable = false; - for (Pair entry : modsToMemtable) { - if (foundMemtable || entry.right.equals(memTable)) { - modifications.add(entry.left); - foundMemtable = true; - } - } - return modifications; - } - @Override public List getVisibleMetadataListFromWriter( RestorableTsFileIOWriter writer, TsFileResource tsFileResource, QueryContext context) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index 8a7f9ff663bf..c5cd3423e746 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -81,8 +81,12 @@ import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessorInfo; import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion; import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; +import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource; import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource; +import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSourceForRegionScan; import org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager; +import org.apache.iotdb.db.storageengine.dataregion.read.filescan.IFileScanHandle; +import org.apache.iotdb.db.storageengine.dataregion.read.filescan.impl.ClosedFileScanHandleImpl; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; @@ -1857,34 +1861,6 @@ public QueryDataSource query( } } - /** lock the read lock of the insert lock */ - @Override - public void readLock() { - // apply read lock for SG insert lock to prevent inconsistent with concurrently writing memtable - insertLock.readLock().lock(); - // apply read lock for TsFileResource list - tsFileManager.readLock(); - } - - /** unlock the read lock of insert lock */ - @Override - public void readUnlock() { - tsFileManager.readUnlock(); - insertLock.readLock().unlock(); - } - - /** lock the write lock of the insert lock */ - public void writeLock(String holder) { - insertLock.writeLock().lock(); - insertWriteLockHolder = holder; - } - - /** unlock the write lock of the insert lock */ - public void writeUnlock() { - insertWriteLockHolder = ""; - insertLock.writeLock().unlock(); - } - /** * @param tsFileResources includes sealed and unsealed tsfile resources * @return fill unsealed tsfile resources with memory data and ChunkMetadataList of data in disk @@ -1907,7 +1883,7 @@ private List getFileResourceListForQuery( (globalTimeFilter == null ? "null" : globalTimeFilter)); } - List tsfileResourcesForQuery = new ArrayList<>(); + List tsFileResourcesForQuery = new ArrayList<>(); long timeLowerBound = dataTTL != Long.MAX_VALUE ? CommonDateTimeUtils.currentTime() - dataTTL : Long.MIN_VALUE; @@ -1925,9 +1901,9 @@ private List getFileResourceListForQuery( closeQueryLock.readLock().lock(); try { if (tsFileResource.isClosed()) { - tsfileResourcesForQuery.add(tsFileResource); + tsFileResourcesForQuery.add(tsFileResource); } else { - tsFileResource.getProcessor().query(pathList, context, tsfileResourcesForQuery); + tsFileResource.getProcessor().query(pathList, context, tsFileResourcesForQuery); } } catch (IOException e) { throw new MetadataException(e); @@ -1935,7 +1911,175 @@ private List getFileResourceListForQuery( closeQueryLock.readLock().unlock(); } } - return tsfileResourcesForQuery; + return tsFileResourcesForQuery; + } + + @Override + public IQueryDataSource queryForSeriesRegionScan( + List pathList, + QueryContext queryContext, + Filter globalTimeFilter, + List timePartitions) + throws QueryProcessException { + try { + List seqFileScanHandles = + getFileHandleListForQuery( + tsFileManager.getTsFileList(true, timePartitions, globalTimeFilter), + pathList, + queryContext, + globalTimeFilter, + true); + List unseqFileScanHandles = + getFileHandleListForQuery( + tsFileManager.getTsFileList(false, timePartitions, globalTimeFilter), + pathList, + queryContext, + globalTimeFilter, + false); + + QUERY_RESOURCE_METRIC_SET.recordQueryResourceNum(SEQUENCE_TSFILE, seqFileScanHandles.size()); + QUERY_RESOURCE_METRIC_SET.recordQueryResourceNum( + UNSEQUENCE_TSFILE, unseqFileScanHandles.size()); + + QueryDataSourceForRegionScan dataSource = + new QueryDataSourceForRegionScan(seqFileScanHandles, unseqFileScanHandles); + dataSource.setDataTTL(dataTTL); + return dataSource; + } catch (MetadataException e) { + throw new QueryProcessException(e); + } + } + + private List getFileHandleListForQuery( + Collection tsFileResources, + List partialPaths, + QueryContext context, + Filter globalTimeFilter, + boolean isSeq) + throws MetadataException { + + List fileScanHandles = new ArrayList<>(); + + long timeLowerBound = + dataTTL != Long.MAX_VALUE ? CommonDateTimeUtils.currentTime() - dataTTL : Long.MIN_VALUE; + context.setQueryTimeLowerBound(timeLowerBound); + + for (TsFileResource tsFileResource : tsFileResources) { + if (!tsFileResource.isSatisfied(null, globalTimeFilter, isSeq, dataTTL, context.isDebug())) { + continue; + } + closeQueryLock.readLock().lock(); + try { + if (tsFileResource.isClosed()) { + fileScanHandles.add(new ClosedFileScanHandleImpl(tsFileResource, context)); + } else { + tsFileResource + .getProcessor() + .queryForSeriesRegionScan(partialPaths, context, fileScanHandles); + } + } finally { + closeQueryLock.readLock().unlock(); + } + } + return fileScanHandles; + } + + @Override + public IQueryDataSource queryForDeviceRegionScan( + Map devicePathToAligned, + QueryContext queryContext, + Filter globalTimeFilter, + List timePartitions) + throws QueryProcessException { + try { + List seqFileScanHandles = + getFileHandleListForQuery( + tsFileManager.getTsFileList(true, timePartitions, globalTimeFilter), + devicePathToAligned, + queryContext, + globalTimeFilter, + true); + List unseqFileScanHandles = + getFileHandleListForQuery( + tsFileManager.getTsFileList(false, timePartitions, globalTimeFilter), + devicePathToAligned, + queryContext, + globalTimeFilter, + false); + + QUERY_RESOURCE_METRIC_SET.recordQueryResourceNum(SEQUENCE_TSFILE, seqFileScanHandles.size()); + QUERY_RESOURCE_METRIC_SET.recordQueryResourceNum( + UNSEQUENCE_TSFILE, unseqFileScanHandles.size()); + + QueryDataSourceForRegionScan dataSource = + new QueryDataSourceForRegionScan(seqFileScanHandles, unseqFileScanHandles); + dataSource.setDataTTL(dataTTL); + return dataSource; + } catch (MetadataException e) { + throw new QueryProcessException(e); + } + } + + private List getFileHandleListForQuery( + Collection tsFileResources, + Map devicePathToAligned, + QueryContext context, + Filter globalTimeFilter, + boolean isSeq) + throws MetadataException { + + List fileScanHandles = new ArrayList<>(); + + long timeLowerBound = + dataTTL != Long.MAX_VALUE ? CommonDateTimeUtils.currentTime() - dataTTL : Long.MIN_VALUE; + context.setQueryTimeLowerBound(timeLowerBound); + + for (TsFileResource tsFileResource : tsFileResources) { + if (!tsFileResource.isSatisfied(null, globalTimeFilter, isSeq, dataTTL, context.isDebug())) { + continue; + } + closeQueryLock.readLock().lock(); + try { + if (tsFileResource.isClosed()) { + fileScanHandles.add(new ClosedFileScanHandleImpl(tsFileResource, context)); + } else { + tsFileResource + .getProcessor() + .queryForDeviceRegionScan(devicePathToAligned, context, fileScanHandles); + } + } finally { + closeQueryLock.readLock().unlock(); + } + } + return fileScanHandles; + } + + /** lock the read lock of the insert lock */ + @Override + public void readLock() { + // apply read lock for SG insert lock to prevent inconsistent with concurrently writing memtable + insertLock.readLock().lock(); + // apply read lock for TsFileResource list + tsFileManager.readLock(); + } + + /** unlock the read lock of insert lock */ + @Override + public void readUnlock() { + tsFileManager.readUnlock(); + insertLock.readLock().unlock(); + } + + /** lock the write lock of the insert lock */ + public void writeLock(String holder) { + insertLock.writeLock().lock(); + insertWriteLockHolder = holder; + } + + /** unlock the write lock of the insert lock */ + public void writeUnlock() { + insertWriteLockHolder = ""; + insertLock.writeLock().unlock(); } /** Seperate tsfiles in TsFileManager to sealedList and unsealedList. */ diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/IDataRegionForQuery.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/IDataRegionForQuery.java index a0a52cf867b4..045ac288b57f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/IDataRegionForQuery.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/IDataRegionForQuery.java @@ -21,11 +21,14 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext; +import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource; import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource; +import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.read.filter.basic.Filter; import java.util.List; +import java.util.Map; /** It's an interface that storage engine must provide for query engine */ public interface IDataRegionForQuery { @@ -35,7 +38,7 @@ public interface IDataRegionForQuery { void readUnlock(); - /** Get satisfied QueryDataSource from DataRegion */ + /** Get satisfied QueryDataSource from DataRegion for seriesScan */ QueryDataSource query( List pathList, String singleDeviceId, @@ -44,6 +47,21 @@ QueryDataSource query( List timePartitions) throws QueryProcessException; + /** Get satisfied QueryDataSource from DataRegion for regionScan */ + IQueryDataSource queryForDeviceRegionScan( + Map devicePathToAligned, + QueryContext queryContext, + Filter globalTimeFilter, + List timePartitions) + throws QueryProcessException; + + IQueryDataSource queryForSeriesRegionScan( + List pathList, + QueryContext queryContext, + Filter globalTimeFilter, + List timePartitions) + throws QueryProcessException; + /** Get TTL of this DataRegion */ long getDataTTL(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/VirtualDataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/VirtualDataRegion.java index 6be5449f9b40..45c1ad75c0e0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/VirtualDataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/VirtualDataRegion.java @@ -21,12 +21,16 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext; +import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource; import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource; +import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSourceForRegionScan; +import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.read.filter.basic.Filter; import java.util.Collections; import java.util.List; +import java.util.Map; /** * It's a virtual data region used for query which contains time series that don't belong to any @@ -39,6 +43,9 @@ public class VirtualDataRegion implements IDataRegionForQuery { private static final QueryDataSource EMPTY_QUERY_DATA_SOURCE = new QueryDataSource(Collections.emptyList(), Collections.emptyList()); + private static final QueryDataSourceForRegionScan EMPTY_REGION_QUERY_DATA_SOURCE = + new QueryDataSourceForRegionScan(Collections.emptyList(), Collections.emptyList()); + public static VirtualDataRegion getInstance() { return VirtualDataRegion.InstanceHolder.INSTANCE; } @@ -64,6 +71,26 @@ public QueryDataSource query( return EMPTY_QUERY_DATA_SOURCE; } + @Override + public IQueryDataSource queryForDeviceRegionScan( + Map devicePathToAligned, + QueryContext queryContext, + Filter globalTimeFilter, + List timePartitions) + throws QueryProcessException { + return EMPTY_REGION_QUERY_DATA_SOURCE; + } + + @Override + public IQueryDataSource queryForSeriesRegionScan( + List pathList, + QueryContext queryContext, + Filter globalTimeFilter, + List timePartitions) + throws QueryProcessException { + return EMPTY_REGION_QUERY_DATA_SOURCE; + } + @Override public long getDataTTL() { return Long.MAX_VALUE; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java index 2182cec0007e..550b35eeee23 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java @@ -20,6 +20,9 @@ package org.apache.iotdb.db.storageengine.dataregion.memtable; import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.exception.MetadataException; +import org.apache.iotdb.commons.path.AlignedPath; +import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.service.metric.MetricService; import org.apache.iotdb.commons.service.metric.enums.Metric; @@ -34,14 +37,25 @@ import org.apache.iotdb.db.storageengine.dataregion.flush.FlushStatus; import org.apache.iotdb.db.storageengine.dataregion.flush.NotifyFlushMemTable; import org.apache.iotdb.db.storageengine.dataregion.modification.Modification; +import org.apache.iotdb.db.storageengine.dataregion.read.filescan.IChunkHandle; +import org.apache.iotdb.db.storageengine.dataregion.read.filescan.impl.MemAlignedChunkHandleImpl; +import org.apache.iotdb.db.storageengine.dataregion.read.filescan.impl.MemChunkHandleImpl; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView; import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils; import org.apache.iotdb.db.utils.MemUtils; +import org.apache.iotdb.db.utils.ModificationUtils; +import org.apache.iotdb.db.utils.datastructure.AlignedTVList; +import org.apache.iotdb.db.utils.datastructure.TVList; import org.apache.iotdb.metrics.utils.MetricLevel; import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.ChunkMetadata; +import org.apache.tsfile.file.metadata.IChunkMetadata; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.file.metadata.PlainDeviceID; +import org.apache.tsfile.file.metadata.statistics.TimeStatistics; +import org.apache.tsfile.read.common.TimeRange; +import org.apache.tsfile.utils.BitMap; import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.ReadWriteIOUtils; import org.apache.tsfile.write.schema.IMeasurementSchema; @@ -49,6 +63,7 @@ import java.io.DataInputStream; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -57,6 +72,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; +import java.util.stream.LongStream; public abstract class AbstractMemTable implements IMemTable { /** Each memTable node has a unique int value identifier, init when recovering wal. */ @@ -465,6 +481,301 @@ public ReadOnlyMemChunk query( .getReadOnlyMemChunkFromMemTable(context, this, modsToMemtable, ttlLowerBound); } + @Override + public void queryForSeriesRegionScan( + PartialPath fullPath, + long ttlLowerBound, + Map> chunkMetaDataMap, + Map> memChunkHandleMap, + List> modsToMemTabled) { + + IDeviceID deviceID = DeviceIDFactory.getInstance().getDeviceID(fullPath.getDevicePath()); + String measurementId = fullPath.getMeasurement(); + Map memTableMap = getMemTableMap(); + + // check If MemTable Contains this path + if (!memTableMap.containsKey(deviceID) || !memTableMap.get(deviceID).contains(measurementId)) { + return; + } + + if (fullPath instanceof MeasurementPath) { + List deletionList = new ArrayList<>(); + if (modsToMemTabled != null) { + deletionList = + ModificationUtils.constructDeletionList( + (MeasurementPath) fullPath, this, modsToMemTabled, ttlLowerBound); + } + getMemChunkHandleFromMemTable( + deviceID, measurementId, chunkMetaDataMap, memChunkHandleMap, deletionList); + } else { + List> deletionList = new ArrayList<>(); + if (modsToMemTabled != null) { + deletionList = + ModificationUtils.constructDeletionList( + (AlignedPath) fullPath, this, modsToMemTabled, ttlLowerBound); + } + + getMemAlignedChunkHandleFromMemTable( + deviceID, + ((AlignedPath) fullPath).getSchemaList(), + chunkMetaDataMap, + memChunkHandleMap, + deletionList); + } + } + + @Override + public void queryForDeviceRegionScan( + IDeviceID deviceID, + boolean isAligned, + long ttlLowerBound, + Map> chunkMetadataMap, + Map> memChunkHandleMap, + List> modsToMemTabled) + throws MetadataException { + + Map memTableMap = getMemTableMap(); + + // check If MemTable Contains this path + if (!memTableMap.containsKey(deviceID)) { + return; + } + + IWritableMemChunkGroup writableMemChunkGroup = memTableMap.get(deviceID); + if (isAligned) { + getMemAlignedChunkHandleFromMemTable( + deviceID, + (AlignedWritableMemChunkGroup) writableMemChunkGroup, + chunkMetadataMap, + memChunkHandleMap, + ttlLowerBound, + modsToMemTabled); + } else { + getMemChunkHandleFromMemTable( + deviceID, + (WritableMemChunkGroup) writableMemChunkGroup, + chunkMetadataMap, + memChunkHandleMap, + ttlLowerBound, + modsToMemTabled); + } + } + + private void getMemChunkHandleFromMemTable( + IDeviceID deviceID, + String measurementId, + Map> chunkMetadataMap, + Map> memChunkHandleMap, + List deletionList) { + + IWritableMemChunk memChunk = memTableMap.get(deviceID).getMemChunkMap().get(measurementId); + + TVList tvListCopy = memChunk.getSortedTvListForQuery(); + long[] timestamps = filterDeletedTimestamp(tvListCopy, deletionList); + + chunkMetadataMap + .computeIfAbsent(measurementId, k -> new ArrayList<>()) + .add( + buildChunkMetaDataForMemoryChunk( + measurementId, + timestamps[0], + timestamps[timestamps.length - 1], + Collections.emptyList())); + memChunkHandleMap + .computeIfAbsent(measurementId, k -> new ArrayList<>()) + .add(new MemChunkHandleImpl(timestamps)); + } + + private void getMemAlignedChunkHandleFromMemTable( + IDeviceID deviceID, + List schemaList, + Map> chunkMetadataList, + Map> memChunkHandleMap, + List> deletionList) { + + AlignedWritableMemChunk alignedMemChunk = + ((AlignedWritableMemChunkGroup) memTableMap.get(deviceID)).getAlignedMemChunk(); + + boolean containsMeasurement = false; + for (IMeasurementSchema measurementSchema : schemaList) { + if (alignedMemChunk.containsMeasurement(measurementSchema.getMeasurementId())) { + containsMeasurement = true; + break; + } + } + if (!containsMeasurement) { + return; + } + + AlignedTVList alignedTVListCopy = + (AlignedTVList) alignedMemChunk.getSortedTvListForQuery(schemaList); + + buildAlignedMemChunkHandle( + alignedTVListCopy, deletionList, schemaList, chunkMetadataList, memChunkHandleMap); + } + + private void getMemAlignedChunkHandleFromMemTable( + IDeviceID deviceID, + AlignedWritableMemChunkGroup writableMemChunkGroup, + Map> chunkMetadataList, + Map> memChunkHandleMap, + long ttlLowerBound, + List> modsToMemTabled) + throws IllegalPathException { + + AlignedWritableMemChunk memChunk = writableMemChunkGroup.getAlignedMemChunk(); + List schemaList = memChunk.getSchemaList(); + + AlignedTVList alignedTVListCopy = (AlignedTVList) memChunk.getSortedTvListForQuery(schemaList); + + List> deletionList = new ArrayList<>(); + if (modsToMemTabled != null) { + for (IMeasurementSchema schema : schemaList) { + deletionList.add( + ModificationUtils.constructDeletionList( + new MeasurementPath(deviceID, schema.getMeasurementId(), schema), + this, + modsToMemTabled, + ttlLowerBound)); + } + } + buildAlignedMemChunkHandle( + alignedTVListCopy, deletionList, schemaList, chunkMetadataList, memChunkHandleMap); + } + + private void getMemChunkHandleFromMemTable( + IDeviceID deviceID, + WritableMemChunkGroup writableMemChunkGroup, + Map> chunkMetadataMap, + Map> memChunkHandleMap, + long ttlLowerBound, + List> modsToMemTabled) + throws IllegalPathException { + + for (Entry entry : + writableMemChunkGroup.getMemChunkMap().entrySet()) { + + String measurementId = entry.getKey(); + IWritableMemChunk writableMemChunk = entry.getValue(); + TVList tvListCopy = writableMemChunk.getSortedTvListForQuery(); + + List deletionList = new ArrayList<>(); + if (modsToMemTabled != null) { + deletionList = + ModificationUtils.constructDeletionList( + new MeasurementPath(deviceID, measurementId, null), + this, + modsToMemTabled, + ttlLowerBound); + } + long[] timestamps = filterDeletedTimestamp(tvListCopy, deletionList); + chunkMetadataMap + .computeIfAbsent(measurementId, k -> new ArrayList<>()) + .add( + buildChunkMetaDataForMemoryChunk( + measurementId, + timestamps[0], + timestamps[timestamps.length - 1], + Collections.emptyList())); + memChunkHandleMap + .computeIfAbsent(measurementId, k -> new ArrayList<>()) + .add(new MemChunkHandleImpl(timestamps)); + } + } + + private void buildAlignedMemChunkHandle( + AlignedTVList alignedTVList, + List> deletionList, + List schemaList, + Map> chunkMetadataList, + Map> chunkHandleMap) { + + List> bitMaps = alignedTVList.getBitMaps(); + long[] timestamps = + alignedTVList.getTimestamps().stream().flatMapToLong(LongStream::of).toArray(); + for (int i = 0; i < schemaList.size(); i++) { + String measurement = schemaList.get(i).getMeasurementId(); + long[] startEndTime = calculateStartEndTime(timestamps, bitMaps.get(i)); + chunkMetadataList + .computeIfAbsent(measurement, k -> new ArrayList<>()) + .add( + buildChunkMetaDataForMemoryChunk( + measurement, startEndTime[0], startEndTime[1], deletionList.get(i))); + chunkHandleMap + .computeIfAbsent(measurement, k -> new ArrayList<>()) + .add( + new MemAlignedChunkHandleImpl( + timestamps, bitMaps.get(i), deletionList.get(i), startEndTime)); + } + } + + private long[] calculateStartEndTime(long[] timestamps, List bitMaps) { + long startTime = -1; + for (int i = 0; i < bitMaps.size(); i++) { + BitMap bitMap = bitMaps.get(i); + for (int j = 0; j < bitMap.getSize(); j++) { + if (!bitMap.isMarked(j)) { + startTime = timestamps[i]; + break; + } + } + if (startTime != -1) { + break; + } + } + + long endTime = -1; + for (int i = bitMaps.size() - 1; i >= 0; i--) { + BitMap bitMap = bitMaps.get(i); + for (int j = bitMap.getSize() - 1; j >= 0; j--) { + if (!bitMap.isMarked(j)) { + endTime = timestamps[i]; + break; + } + } + if (endTime != -1) { + break; + } + } + return new long[] {startTime, endTime}; + } + + private IChunkMetadata buildChunkMetaDataForMemoryChunk( + String measurement, long startTime, long endTime, List deletionList) { + TimeStatistics timeStatistics = new TimeStatistics(); + timeStatistics.setStartTime(startTime); + timeStatistics.setEndTime(endTime); + + // ChunkMetaData for memory is only used to get time statistics, the dataType is irrelevant. + IChunkMetadata chunkMetadata = + new ChunkMetadata(measurement, TSDataType.UNKNOWN, 0, timeStatistics); + for (TimeRange timeRange : deletionList) { + chunkMetadata.insertIntoSortedDeletions(timeRange); + } + return chunkMetadata; + } + + private long[] filterDeletedTimestamp(TVList tvList, List deletionList) { + if (deletionList.isEmpty()) { + return tvList.getTimestamps().stream().flatMapToLong(LongStream::of).toArray(); + } + + long lastTime = -1; + int[] deletionCursor = {0}; + int rowCount = tvList.rowCount(); + List result = new ArrayList<>(); + + for (int i = 0; i < rowCount; i++) { + long curTime = tvList.getTime(i); + if (!ModificationUtils.isPointDeleted(curTime, deletionList, deletionCursor) + && (i == rowCount - 1 || curTime != lastTime)) { + result.add(curTime); + } + lastTime = curTime; + } + return result.stream().mapToLong(Long::longValue).toArray(); + } + /** * Delete data by path and timeStamp. * diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java index 3160668b7c74..bf9fd47d93e9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java @@ -537,4 +537,8 @@ public static AlignedWritableMemChunk deserialize(DataInputStream stream) throws AlignedTVList list = (AlignedTVList) TVList.deserialize(stream); return new AlignedWritableMemChunk(schemaList, list); } + + public List getSchemaList() { + return schemaList; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java index 8a43573950cd..d68d1ef010d4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java @@ -27,8 +27,10 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; import org.apache.iotdb.db.storageengine.dataregion.flush.FlushStatus; import org.apache.iotdb.db.storageengine.dataregion.modification.Modification; +import org.apache.iotdb.db.storageengine.dataregion.read.filescan.IChunkHandle; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue; +import org.apache.tsfile.file.metadata.IChunkMetadata; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.utils.Pair; import org.apache.tsfile.write.schema.IMeasurementSchema; @@ -122,6 +124,23 @@ ReadOnlyMemChunk query( List> modsToMemtabled) throws IOException, QueryProcessException, MetadataException; + void queryForSeriesRegionScan( + PartialPath fullPath, + long ttlLowerBound, + Map> chunkMetadataMap, + Map> memChunkHandleMap, + List> modsToMemtabled) + throws IOException, QueryProcessException, MetadataException; + + void queryForDeviceRegionScan( + IDeviceID deviceID, + boolean isAligned, + long ttlLowerBound, + Map> chunkMetadataMap, + Map> memChunkHandleMap, + List> modsToMemtabled) + throws IOException, QueryProcessException, MetadataException; + /** putBack all the memory resources. */ void clear(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java index 819d5bae8a3d..8f771f3ce039 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java @@ -21,6 +21,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.conf.CommonDescriptor; +import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.exception.MetadataException; import org.apache.iotdb.commons.path.AlignedPath; import org.apache.iotdb.commons.path.PartialPath; @@ -54,7 +55,13 @@ import org.apache.iotdb.db.storageengine.dataregion.flush.NotifyFlushMemTable; import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion; import org.apache.iotdb.db.storageengine.dataregion.modification.Modification; +import org.apache.iotdb.db.storageengine.dataregion.read.filescan.IChunkHandle; +import org.apache.iotdb.db.storageengine.dataregion.read.filescan.IFileScanHandle; +import org.apache.iotdb.db.storageengine.dataregion.read.filescan.impl.DiskAlignedChunkHandleImpl; +import org.apache.iotdb.db.storageengine.dataregion.read.filescan.impl.DiskChunkHandleImpl; +import org.apache.iotdb.db.storageengine.dataregion.read.filescan.impl.UnclosedFileScanHandleImpl; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.db.storageengine.dataregion.utils.SharedTimeDataBuffer; import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager; import org.apache.iotdb.db.storageengine.dataregion.wal.node.IWALNode; import org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.AbstractResultListener; @@ -63,12 +70,14 @@ import org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager; import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo; import org.apache.iotdb.db.utils.MemUtils; +import org.apache.iotdb.db.utils.ModificationUtils; import org.apache.iotdb.db.utils.datastructure.AlignedTVList; import org.apache.iotdb.db.utils.datastructure.TVList; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.AlignedChunkMetadata; import org.apache.tsfile.file.metadata.ChunkMetadata; import org.apache.tsfile.file.metadata.IChunkMetadata; import org.apache.tsfile.file.metadata.IDeviceID; @@ -1612,6 +1621,319 @@ public String getStorageGroupName() { return storageGroupName; } + private void processAlignedChunkMetaDataFromFlushedMemTable( + AlignedChunkMetadata alignedChunkMetadata, + Map> measurementToChunkMetaMap, + Map> measurementToChunkHandleMap, + String filePath) { + SharedTimeDataBuffer sharedTimeDataBuffer = + new SharedTimeDataBuffer(alignedChunkMetadata.getTimeChunkMetadata()); + for (IChunkMetadata valueChunkMetaData : alignedChunkMetadata.getValueChunkMetadataList()) { + measurementToChunkMetaMap + .computeIfAbsent(valueChunkMetaData.getMeasurementUid(), k -> new ArrayList<>()) + .add(valueChunkMetaData); + measurementToChunkHandleMap + .computeIfAbsent(valueChunkMetaData.getMeasurementUid(), k -> new ArrayList<>()) + .add( + new DiskAlignedChunkHandleImpl( + filePath, + false, + valueChunkMetaData.getOffsetOfChunkHeader(), + valueChunkMetaData.getStatistics(), + sharedTimeDataBuffer)); + } + } + + private void processChunkMetaDataFromFlushedMemTable( + ChunkMetadata chunkMetadata, + Map> measurementToChunkMetaMap, + Map> measurementToChunkHandleMap, + String filePath) { + measurementToChunkMetaMap + .computeIfAbsent(chunkMetadata.getMeasurementUid(), k -> new ArrayList<>()) + .add(chunkMetadata); + measurementToChunkHandleMap + .computeIfAbsent(chunkMetadata.getMeasurementUid(), k -> new ArrayList<>()) + .add( + new DiskChunkHandleImpl( + filePath, + false, + chunkMetadata.getOffsetOfChunkHeader(), + chunkMetadata.getStatistics())); + } + + private void buildChunkHandleForFlushedMemTable( + List chunkMetadataList, + Map> measurementToChunkMetaList, + Map> measurementToChunkHandleList) { + for (IChunkMetadata chunkMetadata : chunkMetadataList) { + if (chunkMetadata instanceof AlignedChunkMetadata) { + processAlignedChunkMetaDataFromFlushedMemTable( + (AlignedChunkMetadata) chunkMetadata, + measurementToChunkMetaList, + measurementToChunkHandleList, + this.tsFileResource.getTsFilePath()); + } else { + processChunkMetaDataFromFlushedMemTable( + (ChunkMetadata) chunkMetadata, + measurementToChunkMetaList, + measurementToChunkHandleList, + this.tsFileResource.getTsFilePath()); + } + } + } + + private int searchTimeChunkMetaDataIndexAndSetModifications( + List> chunkMetaDataList, + IDeviceID deviceID, + List> modifications, + QueryContext context) + throws QueryProcessException { + int timeChunkMetaDataIndex = -1; + for (int i = 0; i < chunkMetaDataList.size(); i++) { + List chunkMetadata = chunkMetaDataList.get(i); + String measurement = chunkMetadata.get(0).getMeasurementUid(); + // measurement = "" means this is a timeChunkMetadata + if (measurement.isEmpty()) { + timeChunkMetaDataIndex = i; + continue; + } + + try { + modifications.add(context.getPathModifications(tsFileResource, deviceID, measurement)); + } catch (IllegalPathException e) { + throw new QueryProcessException(e.getMessage()); + } + } + return timeChunkMetaDataIndex; + } + + private List getVisibleMetadataListFromWriterByDeviceID( + QueryContext queryContext, IDeviceID deviceID) throws IllegalPathException { + + List> chunkMetaDataListForDevice = + writer.getVisibleMetadataList(deviceID, null); + List processedChunkMetadataForOneDevice = new ArrayList<>(); + for (List chunkMetadataList : chunkMetaDataListForDevice) { + if (chunkMetadataList.isEmpty()) { + continue; + } + ModificationUtils.modifyChunkMetaData( + chunkMetadataList, + queryContext.getPathModifications( + tsFileResource, deviceID, chunkMetadataList.get(0).getMeasurementUid())); + chunkMetadataList.removeIf(queryContext::chunkNotSatisfy); + processedChunkMetadataForOneDevice.addAll(chunkMetadataList); + } + return new ArrayList<>(processedChunkMetadataForOneDevice); + } + + private List getAlignedVisibleMetadataListFromWriterByDeviceID( + QueryContext queryContext, IDeviceID deviceID) throws QueryProcessException { + + List alignedChunkMetadataForOneDevice = new ArrayList<>(); + List> modifications = new ArrayList<>(); + List> chunkMetaDataListForDevice = + writer.getVisibleMetadataList(deviceID, null); + + int timeChunkMetadataListIndex = + searchTimeChunkMetaDataIndexAndSetModifications( + chunkMetaDataListForDevice, deviceID, modifications, queryContext); + if (timeChunkMetadataListIndex == -1) { + throw new QueryProcessException("TimeChunkMetadata in aligned device should not be empty"); + } + List timeChunkMetadataList = + chunkMetaDataListForDevice.get(timeChunkMetadataListIndex); + + for (int i = 0; i < timeChunkMetadataList.size(); i++) { + List valuesChunkMetadata = new ArrayList<>(); + boolean exits = false; + for (int j = 0; j < chunkMetaDataListForDevice.size(); j++) { + List chunkMetadataList = chunkMetaDataListForDevice.get(j); + // Filter timeChunkMetadata + if (j == timeChunkMetadataListIndex || chunkMetadataList.isEmpty()) { + continue; + } + boolean currentExist = i < chunkMetadataList.size(); + exits = (exits || currentExist); + valuesChunkMetadata.add(currentExist ? chunkMetadataList.get(i) : null); + } + if (exits) { + alignedChunkMetadataForOneDevice.add( + new AlignedChunkMetadata(timeChunkMetadataList.get(i), valuesChunkMetadata)); + } + } + + ModificationUtils.modifyAlignedChunkMetaData(alignedChunkMetadataForOneDevice, modifications); + alignedChunkMetadataForOneDevice.removeIf(queryContext::chunkNotSatisfy); + return new ArrayList<>(alignedChunkMetadataForOneDevice); + } + + public void queryForSeriesRegionScan( + List pathList, + QueryContext queryContext, + List fileScanHandlesForQuery) { + long startTime = System.nanoTime(); + try { + Map>> deviceToMemChunkHandleMap = new HashMap<>(); + Map>> deviceToChunkMetadataListMap = + new HashMap<>(); + flushQueryLock.readLock().lock(); + try { + for (PartialPath seriesPath : pathList) { + Map> measurementToChunkMetaList = new HashMap<>(); + Map> measurementToChunkHandleList = new HashMap<>(); + for (IMemTable flushingMemTable : flushingMemTables) { + if (flushingMemTable.isSignalMemTable()) { + continue; + } + flushingMemTable.queryForSeriesRegionScan( + seriesPath, + queryContext.getQueryTimeLowerBound(), + measurementToChunkMetaList, + measurementToChunkHandleList, + modsToMemtable); + if (workMemTable != null) { + workMemTable.queryForSeriesRegionScan( + seriesPath, + queryContext.getQueryTimeLowerBound(), + measurementToChunkMetaList, + measurementToChunkHandleList, + null); + } + + // Some memTable have been flushed already, so we need to get the chunk metadata from + // writer and build chunk handle for disk scanning + buildChunkHandleForFlushedMemTable( + ResourceByPathUtils.getResourceInstance(seriesPath) + .getVisibleMetadataListFromWriter(writer, tsFileResource, queryContext), + measurementToChunkMetaList, + measurementToChunkHandleList); + + IDeviceID devicePath = + DeviceIDFactory.getInstance().getDeviceID(seriesPath.getDevice()); + if (!measurementToChunkHandleList.isEmpty() || !measurementToChunkMetaList.isEmpty()) { + deviceToMemChunkHandleMap.put(devicePath, measurementToChunkHandleList); + deviceToChunkMetadataListMap.put(devicePath, measurementToChunkMetaList); + } + } + } + } catch (QueryProcessException | MetadataException | IOException e) { + logger.error( + "{}: {} get ReadOnlyMemChunk has error", + storageGroupName, + tsFileResource.getTsFile().getName(), + e); + } finally { + QUERY_RESOURCE_METRICS.recordQueryResourceNum(FLUSHING_MEMTABLE, flushingMemTables.size()); + QUERY_RESOURCE_METRICS.recordQueryResourceNum( + WORKING_MEMTABLE, workMemTable != null ? 1 : 0); + + flushQueryLock.readLock().unlock(); + if (logger.isDebugEnabled()) { + logger.debug( + "{}: {} release flushQueryLock", + storageGroupName, + tsFileResource.getTsFile().getName()); + } + } + if (!deviceToMemChunkHandleMap.isEmpty() || !deviceToChunkMetadataListMap.isEmpty()) { + fileScanHandlesForQuery.add( + new UnclosedFileScanHandleImpl( + deviceToChunkMetadataListMap, deviceToMemChunkHandleMap, tsFileResource)); + } + } finally { + QUERY_EXECUTION_METRICS.recordExecutionCost( + GET_QUERY_RESOURCE_FROM_MEM, System.nanoTime() - startTime); + } + } + + /** + * Construct IFileScanHandle for data in memtable and the other ones in flushing memtables. Then + * get the related ChunkMetadata of data on disk. + */ + public void queryForDeviceRegionScan( + Map devicePathToAligned, + QueryContext queryContext, + List fileScanHandlesForQuery) { + long startTime = System.nanoTime(); + try { + Map>> deviceToMemChunkHandleMap = new HashMap<>(); + Map>> deviceToChunkMetadataListMap = + new HashMap<>(); + flushQueryLock.readLock().lock(); + try { + for (Map.Entry entry : devicePathToAligned.entrySet()) { + IDeviceID devicePath = entry.getKey(); + boolean isAligned = entry.getValue(); + Map> measurementToChunkMetadataList = new HashMap<>(); + Map> measurementToMemChunkHandleList = new HashMap<>(); + for (IMemTable flushingMemTable : flushingMemTables) { + if (flushingMemTable.isSignalMemTable()) { + continue; + } + flushingMemTable.queryForDeviceRegionScan( + devicePath, + isAligned, + queryContext.getQueryTimeLowerBound(), + measurementToChunkMetadataList, + measurementToMemChunkHandleList, + modsToMemtable); + } + if (workMemTable != null) { + workMemTable.queryForDeviceRegionScan( + devicePath, + isAligned, + queryContext.getQueryTimeLowerBound(), + measurementToChunkMetadataList, + measurementToMemChunkHandleList, + null); + } + + buildChunkHandleForFlushedMemTable( + isAligned + ? getAlignedVisibleMetadataListFromWriterByDeviceID(queryContext, devicePath) + : getVisibleMetadataListFromWriterByDeviceID(queryContext, devicePath), + measurementToChunkMetadataList, + measurementToMemChunkHandleList); + + if (!measurementToMemChunkHandleList.isEmpty() + || !measurementToChunkMetadataList.isEmpty()) { + deviceToMemChunkHandleMap.put(devicePath, measurementToMemChunkHandleList); + deviceToChunkMetadataListMap.put(devicePath, measurementToChunkMetadataList); + } + } + } catch (QueryProcessException | MetadataException | IOException e) { + logger.error( + "{}: {} get ReadOnlyMemChunk has error", + storageGroupName, + tsFileResource.getTsFile().getName(), + e); + } finally { + QUERY_RESOURCE_METRICS.recordQueryResourceNum(FLUSHING_MEMTABLE, flushingMemTables.size()); + QUERY_RESOURCE_METRICS.recordQueryResourceNum( + WORKING_MEMTABLE, workMemTable != null ? 1 : 0); + + flushQueryLock.readLock().unlock(); + if (logger.isDebugEnabled()) { + logger.debug( + "{}: {} release flushQueryLock", + storageGroupName, + tsFileResource.getTsFile().getName()); + } + } + + if (!deviceToMemChunkHandleMap.isEmpty() || !deviceToChunkMetadataListMap.isEmpty()) { + fileScanHandlesForQuery.add( + new UnclosedFileScanHandleImpl( + deviceToChunkMetadataListMap, deviceToMemChunkHandleMap, tsFileResource)); + } + } finally { + QUERY_EXECUTION_METRICS.recordExecutionCost( + GET_QUERY_RESOURCE_FROM_MEM, System.nanoTime() - startTime); + } + } + /** * Get the chunk(s) in the memtable (one from work memtable and the other ones in flushing * memtables and then compact them into one TimeValuePairSorter). Then get the related diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/IQueryDataSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/IQueryDataSource.java new file mode 100644 index 000000000000..06ca60551751 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/IQueryDataSource.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.read; + +public interface IQueryDataSource { + + IQueryDataSource clone(); + + long getDataTTL(); +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSource.java index 01e0b76345b0..afe89c14c26d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSource.java @@ -33,7 +33,7 @@ * The QueryDataSource contains all the seq and unseq TsFileResources for one timeseries in one * read. */ -public class QueryDataSource { +public class QueryDataSource implements IQueryDataSource { /** * TsFileResources used by read job. @@ -90,10 +90,19 @@ public List getUnseqResources() { return unseqResources; } + @Override public long getDataTTL() { return dataTTL; } + @Override + public IQueryDataSource clone() { + QueryDataSource queryDataSource = new QueryDataSource(getSeqResources(), getUnseqResources()); + queryDataSource.setSingleDevice(isSingleDevice()); + queryDataSource.setDataTTL(getDataTTL()); + return queryDataSource; + } + public void setDataTTL(long dataTTL) { this.dataTTL = dataTTL; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSourceForRegionScan.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSourceForRegionScan.java new file mode 100644 index 000000000000..257e294276cb --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSourceForRegionScan.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.read; + +import org.apache.iotdb.db.storageengine.dataregion.read.filescan.IFileScanHandle; + +import java.util.List; + +public class QueryDataSourceForRegionScan implements IQueryDataSource { + + private final List seqFileScanHandle; + + private final List unseqFileScanHandles; + + private long dataTTL = Long.MAX_VALUE; + + public QueryDataSourceForRegionScan( + List seqFileScanHandle, List unseqFileScanHandles) { + this.seqFileScanHandle = seqFileScanHandle; + this.unseqFileScanHandles = unseqFileScanHandles; + } + + public List getSeqFileScanHandles() { + return seqFileScanHandle; + } + + public List getUnseqFileScanHandles() { + return unseqFileScanHandles; + } + + @Override + public IQueryDataSource clone() { + QueryDataSourceForRegionScan queryDataSourceForRegionScan = + new QueryDataSourceForRegionScan(seqFileScanHandle, unseqFileScanHandles); + queryDataSourceForRegionScan.setDataTTL(getDataTTL()); + return queryDataSourceForRegionScan; + } + + @Override + public long getDataTTL() { + return dataTTL; + } + + public void setDataTTL(long dataTTL) { + this.dataTTL = dataTTL; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSourceType.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSourceType.java new file mode 100644 index 000000000000..d80ce27ad7a1 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSourceType.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.storageengine.dataregion.read; + +public enum QueryDataSourceType { + SERIES_SCAN, + DEVICE_REGION_SCAN, + TIME_SERIES_REGION_SCAN +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/IChunkHandle.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/IChunkHandle.java new file mode 100644 index 000000000000..b5ced3f3d374 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/IChunkHandle.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.read.filescan; + +import java.io.IOException; + +/** This interface is used to handle the scan of chunks in TSFile. */ +public interface IChunkHandle { + + /** + * Check If there is more pages to be scanned in Chunk. If so, move to next page and return true + */ + boolean hasNextPage() throws IOException; + + /** Skip the current page */ + void skipCurrentPage(); + + /** + * Get the statistics time of page in Chunk. + * + * @return start time and end time of page. + */ + long[] getPageStatisticsTime(); + + /** + * Scan the data in the page and get the timestamp. It will cause disk IO if tsFile is not in + * memory. + * + * @return the iterator of timestamp. + */ + long[] getDataTime() throws IOException; +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/IFileScanHandle.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/IFileScanHandle.java new file mode 100644 index 000000000000..53c7fcb918d2 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/IFileScanHandle.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.read.filescan; + +import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.db.storageengine.dataregion.read.filescan.model.AbstractChunkOffset; +import org.apache.iotdb.db.storageengine.dataregion.read.filescan.model.AbstractDeviceChunkMetaData; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileDeviceStartEndTimeIterator; + +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.statistics.Statistics; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Iterator; +import java.util.List; + +/** + * IFileScanHandle will supply interfaces for metadata checking and chunk scanning for specified one + * TsFile. + */ +public interface IFileScanHandle { + + /** + * Get timeIndex for devices in current TsFile. + * + * @return the iterator of DeviceStartEndTime, which includes devicePath and startEndTime of this + * devicePath. + */ + TsFileDeviceStartEndTimeIterator getDeviceStartEndTimeIterator() throws IOException; + + /** + * Check whether timestamp is deleted in specified device. + * + * @param deviceID the devicePath needs to be checked. + * @param timeArray time value needed to be checked, which should be ordered. + * @return A boolean array, which indicates whether the timestamp in timeArray is deleted. + */ + boolean[] isDeviceTimeDeleted(IDeviceID deviceID, long[] timeArray) throws IllegalPathException; + + /** + * Get all the chunkMetaData in current TsFile. ChunkMetaData will be organized in device level. + * + * @return the iterator of DeviceChunkMetaData, which includes the devicePath, measurementId and + * relating chunkMetaDataList. + */ + Iterator getAllDeviceChunkMetaData() throws IOException; + + /** + * Check whether timestamp in specified timeSeries is deleted. + * + * @param deviceID the devicePath needs to be checked. + * @param timeSeriesName the timeSeries needs to be checked. + * @param timeArray time value needed to be checked, which should be ordered. + * @return A boolean array, which indicates whether the timestamp in timeArray is deleted. + */ + boolean[] isTimeSeriesTimeDeleted(IDeviceID deviceID, String timeSeriesName, long[] timeArray) + throws IllegalPathException; + + /** + * Get the chunkHandles of chunks needed to be scanned. ChunkHandles are used to read chunk. + * + * @param chunkInfoList the list of ChunkOffset, which decides the chunk needed to be scanned. + * @param statisticsList the list of Statistics, which will be used when there is only one page in + * chunk. + * @return the iterator of IChunkHandle. + */ + Iterator getChunkHandles( + List chunkInfoList, + List> statisticsList) + throws IOException; + + /** If the TsFile of this handle is closed. */ + boolean isClosed(); + + /** If the TsFile of this handle is deleted. */ + boolean isDeleted(); + + /** Get TsFileResource of current TsFile. */ + TsFileResource getTsResource(); +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/ClosedFileScanHandleImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/ClosedFileScanHandleImpl.java new file mode 100644 index 000000000000..01efa19f2c70 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/ClosedFileScanHandleImpl.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.read.filescan.impl; + +import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext; +import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion; +import org.apache.iotdb.db.storageengine.dataregion.modification.Modification; +import org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager; +import org.apache.iotdb.db.storageengine.dataregion.read.filescan.IChunkHandle; +import org.apache.iotdb.db.storageengine.dataregion.read.filescan.IFileScanHandle; +import org.apache.iotdb.db.storageengine.dataregion.read.filescan.model.AbstractChunkOffset; +import org.apache.iotdb.db.storageengine.dataregion.read.filescan.model.AbstractDeviceChunkMetaData; +import org.apache.iotdb.db.storageengine.dataregion.read.filescan.model.AlignedDeviceChunkMetaData; +import org.apache.iotdb.db.storageengine.dataregion.read.filescan.model.DeviceChunkMetaData; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.DeviceTimeIndex; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex; +import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileDeviceStartEndTimeIterator; +import org.apache.iotdb.db.utils.ModificationUtils; + +import org.apache.tsfile.file.metadata.AlignedChunkMetadata; +import org.apache.tsfile.file.metadata.IChunkMetadata; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.statistics.Statistics; +import org.apache.tsfile.read.TsFileDeviceIterator; +import org.apache.tsfile.read.TsFileSequenceReader; +import org.apache.tsfile.read.common.TimeRange; +import org.apache.tsfile.utils.Pair; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class ClosedFileScanHandleImpl implements IFileScanHandle { + + private final TsFileResource tsFileResource; + private final QueryContext queryContext; + // Used to cache the modifications of each timeseries + private final Map>> deviceToModifications; + + public ClosedFileScanHandleImpl(TsFileResource tsFileResource, QueryContext context) { + this.tsFileResource = tsFileResource; + this.queryContext = context; + this.deviceToModifications = new HashMap<>(); + } + + @Override + public TsFileDeviceStartEndTimeIterator getDeviceStartEndTimeIterator() throws IOException { + ITimeIndex timeIndex = tsFileResource.getTimeIndex(); + return timeIndex instanceof DeviceTimeIndex + ? new TsFileDeviceStartEndTimeIterator((DeviceTimeIndex) timeIndex) + : new TsFileDeviceStartEndTimeIterator(tsFileResource.buildDeviceTimeIndex()); + } + + @Override + public boolean[] isDeviceTimeDeleted(IDeviceID deviceID, long[] timeArray) + throws IllegalPathException { + boolean[] result = new boolean[2]; + List modifications = queryContext.getPathModifications(tsFileResource, deviceID); + List timeRangeList = + modifications.stream() + .filter(Deletion.class::isInstance) + .map(Deletion.class::cast) + .map(Deletion::getTimeRange) + .collect(Collectors.toList()); + + int[] deleteCursor = {0}; + for (int i = 0; i < timeArray.length; i++) { + result[i] = ModificationUtils.isPointDeleted(timeArray[i], timeRangeList, deleteCursor); + } + return result; + } + + private boolean[] calculateBooleanArray(List timeRangeList, long[] timeArray) { + boolean[] result = new boolean[timeArray.length]; + int[] deleteCursor = {0}; + for (int i = 0; i < timeArray.length; i++) { + result[i] = ModificationUtils.isPointDeleted(timeArray[i], timeRangeList, deleteCursor); + } + return result; + } + + @Override + public boolean[] isTimeSeriesTimeDeleted( + IDeviceID deviceID, String timeSeriesName, long[] timeArray) throws IllegalPathException { + + if (deviceToModifications.containsKey(deviceID) + && deviceToModifications.get(deviceID).containsKey(timeSeriesName)) { + return calculateBooleanArray( + deviceToModifications.get(deviceID).get(timeSeriesName), timeArray); + } + + List modifications = + queryContext.getPathModifications(tsFileResource, deviceID, timeSeriesName); + List timeRangeList = + modifications.stream() + .filter(Deletion.class::isInstance) + .map(Deletion.class::cast) + .map(Deletion::getTimeRange) + .collect(Collectors.toList()); + deviceToModifications + .computeIfAbsent(deviceID, k -> new HashMap<>()) + .put(timeSeriesName, timeRangeList); + return calculateBooleanArray(timeRangeList, timeArray); + } + + @Override + public Iterator getAllDeviceChunkMetaData() throws IOException { + + TsFileSequenceReader tsFileReader = FileReaderManager.getInstance().get(getFilePath(), true); + TsFileDeviceIterator deviceIterator = tsFileReader.getAllDevicesIteratorWithIsAligned(); + + List deviceChunkMetaDataList = new LinkedList<>(); + // Traverse each device in current tsFile and get all the relating chunkMetaData + while (deviceIterator.hasNext()) { + Pair deviceIDWithIsAligned = deviceIterator.next(); + Map, Pair>> metadataForDevice = + tsFileReader.getTimeseriesMetadataOffsetByDevice( + deviceIterator.getFirstMeasurementNodeOfCurrentDevice(), + Collections.emptySet(), + true); + if (!deviceIDWithIsAligned.right) { + // device is not aligned + deviceChunkMetaDataList.add( + new DeviceChunkMetaData( + deviceIDWithIsAligned.left, + metadataForDevice.values().stream() + .flatMap(pair -> pair.getLeft().stream()) + .collect(Collectors.toList()))); + } else { + // device is aligned + List timeChunkMetaData = metadataForDevice.get("").getLeft(); + List> valueMetaDataList = new ArrayList<>(); + for (Map.Entry, Pair>> pair : + metadataForDevice.entrySet()) { + // Skip timeChunkMetaData + if (pair.getKey().isEmpty()) { + continue; + } + valueMetaDataList.add(pair.getValue().getLeft()); + } + + List alignedDeviceChunkMetaData = new ArrayList<>(); + for (int i = 0; i < timeChunkMetaData.size(); i++) { + alignedDeviceChunkMetaData.add( + new AlignedChunkMetadata(timeChunkMetaData.get(i), valueMetaDataList.get(i))); + } + deviceChunkMetaDataList.add( + new AlignedDeviceChunkMetaData(deviceIDWithIsAligned.left, alignedDeviceChunkMetaData)); + } + } + return deviceChunkMetaDataList.iterator(); + } + + @Override + public Iterator getChunkHandles( + List chunkInfoList, + List> statisticsList) { + String filePath = tsFileResource.getTsFilePath(); + List chunkHandleList = new ArrayList<>(); + for (int i = 0; i < chunkInfoList.size(); i++) { + AbstractChunkOffset chunkOffset = chunkInfoList.get(i); + chunkHandleList.add(chunkOffset.generateChunkHandle(filePath, statisticsList.get(i))); + } + return chunkHandleList.iterator(); + } + + @Override + public boolean isClosed() { + return true; + } + + @Override + public boolean isDeleted() { + return tsFileResource.isDeleted(); + } + + public String getFilePath() { + return tsFileResource.getTsFilePath(); + } + + @Override + public TsFileResource getTsResource() { + return tsFileResource; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/DiskAlignedChunkHandleImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/DiskAlignedChunkHandleImpl.java new file mode 100644 index 000000000000..50baf1b85e8a --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/DiskAlignedChunkHandleImpl.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.read.filescan.impl; + +import org.apache.iotdb.db.storageengine.dataregion.utils.SharedTimeDataBuffer; + +import org.apache.tsfile.file.metadata.statistics.Statistics; +import org.apache.tsfile.read.TsFileSequenceReader; +import org.apache.tsfile.read.reader.chunk.ChunkReader; +import org.apache.tsfile.utils.ReadWriteIOUtils; + +import java.io.IOException; +import java.io.Serializable; +import java.nio.ByteBuffer; + +public class DiskAlignedChunkHandleImpl extends DiskChunkHandleImpl { + private static final int MASK = 0x80; + + private final SharedTimeDataBuffer sharedTimeDataBuffer; + private int pageIndex = 0; + + public DiskAlignedChunkHandleImpl( + String filePath, + boolean isTsFileClosed, + long offset, + Statistics chunkStatistic, + SharedTimeDataBuffer sharedTimeDataBuffer) { + super(filePath, isTsFileClosed, offset, chunkStatistic); + this.sharedTimeDataBuffer = sharedTimeDataBuffer; + } + + @Override + protected void init(TsFileSequenceReader reader) throws IOException { + sharedTimeDataBuffer.init(reader); + super.init(reader); + } + + @Override + public long[] getDataTime() throws IOException { + ByteBuffer currentPageDataBuffer = + ChunkReader.deserializePageData( + this.currentPageHeader, this.currentChunkDataBuffer, this.currentChunkHeader); + int size = ReadWriteIOUtils.readInt(currentPageDataBuffer); + byte[] bitmap = new byte[(size + 7) / 8]; + currentPageDataBuffer.get(bitmap); + + long[] timeData = sharedTimeDataBuffer.getPageTime(pageIndex); + if (timeData.length != size) { + throw new UnsupportedOperationException("Time data size not match"); + } + + long[] validTimeList = new long[(int) currentPageHeader.getNumOfValues()]; + for (int i = 0; i < size; i++) { + if (((bitmap[i / 8] & 0xFF) & (MASK >>> (i % 8))) == 0) { + continue; + } + long timestamp = timeData[i]; + validTimeList[i] = timestamp; + } + + pageIndex++; + return validTimeList; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/DiskChunkHandleImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/DiskChunkHandleImpl.java new file mode 100644 index 000000000000..429cef7ee5b7 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/DiskChunkHandleImpl.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.read.filescan.impl; + +import org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager; +import org.apache.iotdb.db.storageengine.dataregion.read.filescan.IChunkHandle; + +import org.apache.tsfile.common.conf.TSFileDescriptor; +import org.apache.tsfile.encoding.decoder.Decoder; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.MetaMarker; +import org.apache.tsfile.file.header.ChunkHeader; +import org.apache.tsfile.file.header.PageHeader; +import org.apache.tsfile.file.metadata.enums.TSEncoding; +import org.apache.tsfile.file.metadata.statistics.Statistics; +import org.apache.tsfile.read.TsFileSequenceReader; +import org.apache.tsfile.read.common.Chunk; +import org.apache.tsfile.read.reader.chunk.ChunkReader; +import org.apache.tsfile.utils.ReadWriteForEncodingUtils; + +import java.io.IOException; +import java.io.Serializable; +import java.nio.ByteBuffer; + +/** It will receive a list of offset and execute sequential scan of TsFile for chunkData. */ +public class DiskChunkHandleImpl implements IChunkHandle { + private final boolean tsFileClosed; + private final String filePath; + protected ChunkHeader currentChunkHeader; + protected PageHeader currentPageHeader; + protected ByteBuffer currentChunkDataBuffer; + protected long offset; + + private final Decoder defaultTimeDecoder = + Decoder.getDecoderByType( + TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()), + TSDataType.INT64); + + // Page will reuse chunkStatistics if there is only one page in chunk + protected final Statistics chunkStatistic; + + public DiskChunkHandleImpl( + String filePath, + boolean isTsFileClosed, + long offset, + Statistics chunkStatistics) { + this.chunkStatistic = chunkStatistics; + this.offset = offset; + this.filePath = filePath; + this.tsFileClosed = isTsFileClosed; + } + + protected void init(TsFileSequenceReader reader) throws IOException { + if (currentChunkDataBuffer != null) { + return; + } + Chunk chunk = reader.readMemChunk(offset); + this.currentChunkDataBuffer = chunk.getData(); + this.currentChunkHeader = chunk.getHeader(); + } + + // Check if there is more pages to be scanned in Chunk. + // If so, deserialize the page header + @Override + public boolean hasNextPage() throws IOException { + // read chunk from disk if needed + if (currentChunkDataBuffer == null) { + TsFileSequenceReader reader = FileReaderManager.getInstance().get(filePath, tsFileClosed); + init(reader); + } + + if (!currentChunkDataBuffer.hasRemaining()) { + return false; + } + // If there is only one page, page statistics is not stored in the chunk header, which is the + // same as chunkStatistics + if ((byte) (this.currentChunkHeader.getChunkType() & 0x3F) + == MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER) { + currentPageHeader = + PageHeader.deserializeFrom(this.currentChunkDataBuffer, this.chunkStatistic); + } else { + currentPageHeader = + PageHeader.deserializeFrom( + this.currentChunkDataBuffer, this.currentChunkHeader.getDataType()); + } + return true; + } + + @Override + public void skipCurrentPage() { + currentChunkDataBuffer.position( + currentChunkDataBuffer.position() + currentPageHeader.getCompressedSize()); + } + + @Override + public long[] getPageStatisticsTime() { + return new long[] {currentPageHeader.getStartTime(), currentPageHeader.getEndTime()}; + } + + @Override + public long[] getDataTime() throws IOException { + ByteBuffer currentPageDataBuffer = + ChunkReader.deserializePageData( + currentPageHeader, this.currentChunkDataBuffer, this.currentChunkHeader); + int timeBufferLength = ReadWriteForEncodingUtils.readUnsignedVarInt(currentPageDataBuffer); + ByteBuffer timeBuffer = currentPageDataBuffer.slice(); + timeBuffer.limit(timeBufferLength); + + return convertToTimeArray(timeBuffer); + } + + private long[] convertToTimeArray(ByteBuffer timeBuffer) throws IOException { + long[] timeArray = new long[(int) currentPageHeader.getNumOfValues()]; + int index = 0; + while (defaultTimeDecoder.hasNext(timeBuffer)) { + timeArray[index++] = defaultTimeDecoder.readLong(timeBuffer); + } + return timeArray; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/MemAlignedChunkHandleImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/MemAlignedChunkHandleImpl.java new file mode 100644 index 000000000000..868950c717ec --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/MemAlignedChunkHandleImpl.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.read.filescan.impl; + +import org.apache.iotdb.db.utils.ModificationUtils; + +import org.apache.tsfile.read.common.TimeRange; +import org.apache.tsfile.utils.BitMap; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.ARRAY_SIZE; + +public class MemAlignedChunkHandleImpl extends MemChunkHandleImpl { + + private final List bitMapOfValue; + private final List deletionList; + // start time and end time of the chunk according to bitMap + private final long[] startEndTime; + + public MemAlignedChunkHandleImpl( + long[] dataOfTimestamp, + List bitMapOfValue, + List deletionList, + long[] startEndTime) { + super(dataOfTimestamp); + this.bitMapOfValue = bitMapOfValue; + this.deletionList = deletionList; + this.startEndTime = startEndTime; + } + + @Override + public long[] getPageStatisticsTime() { + return startEndTime; + } + + @Override + public long[] getDataTime() throws IOException { + List timeList = new ArrayList<>(); + int[] deletionCursor = {0}; + for (int i = 0; i < dataOfTimestamp.length; i++) { + int arrayIndex = i / ARRAY_SIZE; + int elementIndex = i % ARRAY_SIZE; + if (!bitMapOfValue.get(arrayIndex).isMarked(elementIndex) + && !ModificationUtils.isPointDeleted(dataOfTimestamp[i], deletionList, deletionCursor) + && (i == dataOfTimestamp.length - 1 || dataOfTimestamp[i] != dataOfTimestamp[i + 1])) { + timeList.add(dataOfTimestamp[i]); + } + } + hasRead = true; + return timeList.stream().mapToLong(Long::longValue).toArray(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/MemChunkHandleImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/MemChunkHandleImpl.java new file mode 100644 index 000000000000..e84dd071a40e --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/MemChunkHandleImpl.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.read.filescan.impl; + +import org.apache.iotdb.db.storageengine.dataregion.read.filescan.IChunkHandle; + +import java.io.IOException; + +public class MemChunkHandleImpl implements IChunkHandle { + protected final long[] dataOfTimestamp; + + protected boolean hasRead = false; + + public MemChunkHandleImpl(long[] dataOfTimestamp) { + this.dataOfTimestamp = dataOfTimestamp; + } + + @Override + public boolean hasNextPage() throws IOException { + return !hasRead; + } + + // MemChunk only has one page in handle + @Override + public void skipCurrentPage() { + hasRead = true; + } + + @Override + public long[] getPageStatisticsTime() { + return new long[] {dataOfTimestamp[0], dataOfTimestamp[dataOfTimestamp.length - 1]}; + } + + @Override + public long[] getDataTime() throws IOException { + hasRead = true; + return dataOfTimestamp; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/UnclosedFileScanHandleImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/UnclosedFileScanHandleImpl.java new file mode 100644 index 000000000000..fef83dd0d7eb --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/UnclosedFileScanHandleImpl.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.read.filescan.impl; + +import org.apache.iotdb.db.storageengine.dataregion.read.filescan.IChunkHandle; +import org.apache.iotdb.db.storageengine.dataregion.read.filescan.IFileScanHandle; +import org.apache.iotdb.db.storageengine.dataregion.read.filescan.model.AbstractChunkOffset; +import org.apache.iotdb.db.storageengine.dataregion.read.filescan.model.AbstractDeviceChunkMetaData; +import org.apache.iotdb.db.storageengine.dataregion.read.filescan.model.AlignedDeviceChunkMetaData; +import org.apache.iotdb.db.storageengine.dataregion.read.filescan.model.ChunkOffset; +import org.apache.iotdb.db.storageengine.dataregion.read.filescan.model.DeviceChunkMetaData; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.DeviceTimeIndex; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex; +import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileDeviceStartEndTimeIterator; +import org.apache.iotdb.db.utils.ModificationUtils; + +import org.apache.tsfile.file.metadata.AlignedChunkMetadata; +import org.apache.tsfile.file.metadata.IChunkMetadata; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.statistics.Statistics; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +public class UnclosedFileScanHandleImpl implements IFileScanHandle { + + private final TsFileResource tsFileResource; + private final Map>> deviceToChunkMetadataMap; + private final Map>> deviceToMemChunkHandleMap; + + public UnclosedFileScanHandleImpl( + Map>> deviceToChunkMetadataMap, + Map>> deviceToMemChunkHandleMap, + TsFileResource tsFileResource) { + this.deviceToChunkMetadataMap = deviceToChunkMetadataMap; + this.deviceToMemChunkHandleMap = deviceToMemChunkHandleMap; + this.tsFileResource = tsFileResource; + } + + @Override + public TsFileDeviceStartEndTimeIterator getDeviceStartEndTimeIterator() throws IOException { + ITimeIndex timeIndex = tsFileResource.getTimeIndex(); + return timeIndex instanceof DeviceTimeIndex + ? new TsFileDeviceStartEndTimeIterator((DeviceTimeIndex) timeIndex) + : new TsFileDeviceStartEndTimeIterator(tsFileResource.buildDeviceTimeIndex()); + } + + @Override + public boolean[] isDeviceTimeDeleted(IDeviceID deviceID, long[] timeArray) { + Map> chunkMetadataMap = deviceToChunkMetadataMap.get(deviceID); + boolean[] result = new boolean[timeArray.length]; + + chunkMetadataMap.values().stream() + .flatMap(List::stream) + .map(IChunkMetadata::getDeleteIntervalList) + .filter(deleteIntervalList -> !deleteIntervalList.isEmpty()) + .forEach( + timeRangeList -> { + int[] deleteCursor = {0}; + for (int i = 0; i < timeArray.length; i++) { + if (!result[i] + && ModificationUtils.isPointDeleted( + timeArray[i], timeRangeList, deleteCursor)) { + result[i] = true; + } + } + }); + return result; + } + + @Override + public Iterator getAllDeviceChunkMetaData() throws IOException { + List deviceChunkMetaDataList = new ArrayList<>(); + for (Map.Entry>> entry : + deviceToChunkMetadataMap.entrySet()) { + IDeviceID deviceID = entry.getKey(); + Map> chunkMetadataList = entry.getValue(); + if (chunkMetadataList.isEmpty()) { + continue; + } + + boolean isAligned = chunkMetadataList.containsKey(""); + if (isAligned) { + List alignedChunkMetadataList = new ArrayList<>(); + List timeChunkMetadataList = chunkMetadataList.get(""); + List> valueChunkMetadataList = + new ArrayList<>(chunkMetadataList.values()); + for (int i = 0; i < timeChunkMetadataList.size(); i++) { + alignedChunkMetadataList.add( + new AlignedChunkMetadata( + timeChunkMetadataList.get(i), valueChunkMetadataList.get(i))); + } + deviceChunkMetaDataList.add( + new AlignedDeviceChunkMetaData(deviceID, alignedChunkMetadataList)); + } else { + for (Map.Entry> measurementMetaData : + chunkMetadataList.entrySet()) { + deviceChunkMetaDataList.add( + new DeviceChunkMetaData(deviceID, measurementMetaData.getValue())); + } + } + } + return deviceChunkMetaDataList.iterator(); + } + + @Override + public boolean[] isTimeSeriesTimeDeleted( + IDeviceID deviceID, String timeSeriesName, long[] timeArray) { + List chunkMetadataList = + deviceToChunkMetadataMap.get(deviceID).get(timeSeriesName); + boolean[] result = new boolean[timeArray.length]; + chunkMetadataList.stream() + .map(IChunkMetadata::getDeleteIntervalList) + .filter(deleteIntervalList -> !deleteIntervalList.isEmpty()) + .forEach( + timeRangeList -> { + int[] deleteCursor = {0}; + for (int i = 0; i < timeArray.length; i++) { + if (!result[i] + && ModificationUtils.isPointDeleted( + timeArray[i], timeRangeList, deleteCursor)) { + result[i] = true; + } + } + }); + return result; + } + + @Override + public Iterator getChunkHandles( + List chunkInfoList, + List> statisticsList) { + List chunkHandleList = new ArrayList<>(); + for (AbstractChunkOffset chunkOffsetInfo : chunkInfoList) { + List chunkHandle = + deviceToMemChunkHandleMap + .get(chunkOffsetInfo.getDevicePath()) + .get(((ChunkOffset) chunkOffsetInfo).getMeasurement()); + chunkHandleList.addAll(chunkHandle); + } + return chunkHandleList.iterator(); + } + + @Override + public boolean isClosed() { + return false; + } + + @Override + public boolean isDeleted() { + return tsFileResource.isDeleted(); + } + + @Override + public TsFileResource getTsResource() { + return tsFileResource; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/model/AbstractChunkOffset.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/model/AbstractChunkOffset.java new file mode 100644 index 000000000000..93c804c7322e --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/model/AbstractChunkOffset.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.read.filescan.model; + +import org.apache.iotdb.db.storageengine.dataregion.read.filescan.IChunkHandle; + +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.statistics.Statistics; + +import java.io.Serializable; + +public abstract class AbstractChunkOffset { + + private final IDeviceID devicePath; + private final long offSet; + + protected AbstractChunkOffset(long offSet, IDeviceID devicePath) { + this.offSet = offSet; + this.devicePath = devicePath; + } + + public abstract IChunkHandle generateChunkHandle( + String filePath, Statistics statistics); + + public IDeviceID getDevicePath() { + return devicePath; + } + + public long getOffSet() { + return offSet; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/model/AbstractDeviceChunkMetaData.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/model/AbstractDeviceChunkMetaData.java new file mode 100644 index 000000000000..cac85810634c --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/model/AbstractDeviceChunkMetaData.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.read.filescan.model; + +import org.apache.tsfile.file.metadata.IDeviceID; + +public abstract class AbstractDeviceChunkMetaData { + private final IDeviceID devicePath; + + public AbstractDeviceChunkMetaData(IDeviceID devicePath) { + this.devicePath = devicePath; + } + + public IDeviceID getDevicePath() { + return devicePath; + } + + public abstract boolean isAligned(); +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/model/AlignedChunkOffset.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/model/AlignedChunkOffset.java new file mode 100644 index 000000000000..a145d3e47aaa --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/model/AlignedChunkOffset.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.read.filescan.model; + +import org.apache.iotdb.db.storageengine.dataregion.read.filescan.IChunkHandle; +import org.apache.iotdb.db.storageengine.dataregion.read.filescan.impl.DiskAlignedChunkHandleImpl; +import org.apache.iotdb.db.storageengine.dataregion.utils.SharedTimeDataBuffer; + +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.statistics.Statistics; + +import java.io.Serializable; + +public class AlignedChunkOffset extends AbstractChunkOffset { + + // Used by aligned device to share the same time buffer + private final SharedTimeDataBuffer sharedTimeDataBuffer; + + public AlignedChunkOffset( + long offSet, IDeviceID devicePath, SharedTimeDataBuffer sharedTimeDataBuffer) { + super(offSet, devicePath); + this.sharedTimeDataBuffer = sharedTimeDataBuffer; + } + + public SharedTimeDataBuffer getSharedTimeDataBuffer() { + return sharedTimeDataBuffer; + } + + @Override + public IChunkHandle generateChunkHandle( + String filePath, Statistics statistics) { + return new DiskAlignedChunkHandleImpl( + filePath, true, getOffSet(), statistics, sharedTimeDataBuffer); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/model/AlignedDeviceChunkMetaData.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/model/AlignedDeviceChunkMetaData.java new file mode 100644 index 000000000000..39fd103d0ab9 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/model/AlignedDeviceChunkMetaData.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.storageengine.dataregion.read.filescan.model; + +import org.apache.tsfile.file.metadata.AlignedChunkMetadata; +import org.apache.tsfile.file.metadata.IDeviceID; + +import java.util.List; + +public class AlignedDeviceChunkMetaData extends AbstractDeviceChunkMetaData { + + List alignedChunkMetadataList; + + public AlignedDeviceChunkMetaData( + IDeviceID devicePath, List alignedChunkMetadataList) { + super(devicePath); + this.alignedChunkMetadataList = alignedChunkMetadataList; + } + + public List getAlignedChunkMetadataList() { + return alignedChunkMetadataList; + } + + @Override + public boolean isAligned() { + return true; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/model/ChunkOffset.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/model/ChunkOffset.java new file mode 100644 index 000000000000..7d2ec8a63e27 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/model/ChunkOffset.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.read.filescan.model; + +import org.apache.iotdb.db.storageengine.dataregion.read.filescan.IChunkHandle; +import org.apache.iotdb.db.storageengine.dataregion.read.filescan.impl.DiskChunkHandleImpl; + +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.statistics.Statistics; + +import java.io.Serializable; + +public class ChunkOffset extends AbstractChunkOffset { + + private final String measurement; + + public ChunkOffset(long offset, IDeviceID deviceID, String measurement) { + super(offset, deviceID); + this.measurement = measurement; + } + + public String getMeasurement() { + return measurement; + } + + @Override + public IChunkHandle generateChunkHandle( + String filePath, Statistics statistics) { + return new DiskChunkHandleImpl(filePath, true, getOffSet(), statistics); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/model/DeviceChunkMetaData.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/model/DeviceChunkMetaData.java new file mode 100644 index 000000000000..373412603393 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/model/DeviceChunkMetaData.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.read.filescan.model; + +import org.apache.tsfile.file.metadata.IChunkMetadata; +import org.apache.tsfile.file.metadata.IDeviceID; + +import java.util.List; + +public class DeviceChunkMetaData extends AbstractDeviceChunkMetaData { + + private final List measurementChunkMetadata; + + public DeviceChunkMetaData( + IDeviceID devicePath, List measurementChunkMetadataMap) { + super(devicePath); + this.measurementChunkMetadata = measurementChunkMetadataMap; + } + + public List getMeasurementChunkMetadataMap() { + return measurementChunkMetadata; + } + + @Override + public boolean isAligned() { + return false; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/model/DeviceStartEndTime.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/model/DeviceStartEndTime.java new file mode 100644 index 000000000000..3c036baddb71 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/model/DeviceStartEndTime.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.read.filescan.model; + +import org.apache.tsfile.file.metadata.IDeviceID; + +public class DeviceStartEndTime { + private final IDeviceID devicePath; + private final long startTime; + private final long endTime; + + public DeviceStartEndTime(IDeviceID devicePath, long startTime, long endTime) { + this.devicePath = devicePath; + this.startTime = startTime; + this.endTime = endTime; + } + + public IDeviceID getDevicePath() { + return devicePath; + } + + public long getEndTime() { + return endTime; + } + + public long getStartTime() { + return startTime; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/SharedTimeDataBuffer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/SharedTimeDataBuffer.java new file mode 100644 index 000000000000..606037b529da --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/SharedTimeDataBuffer.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.utils; + +import org.apache.tsfile.common.conf.TSFileDescriptor; +import org.apache.tsfile.encoding.decoder.Decoder; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.header.ChunkHeader; +import org.apache.tsfile.file.header.PageHeader; +import org.apache.tsfile.file.metadata.IChunkMetadata; +import org.apache.tsfile.file.metadata.enums.TSEncoding; +import org.apache.tsfile.read.TsFileSequenceReader; +import org.apache.tsfile.read.common.Chunk; +import org.apache.tsfile.read.reader.chunk.ChunkReader; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +public class SharedTimeDataBuffer { + private ByteBuffer timeBuffer; + private final IChunkMetadata timeChunkMetaData; + private ChunkHeader timeChunkHeader; + private final List timeData; + private final Decoder defaultTimeDecoder = + Decoder.getDecoderByType( + TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()), + TSDataType.INT64); + + public SharedTimeDataBuffer(IChunkMetadata timeChunkMetaData) { + this.timeChunkMetaData = timeChunkMetaData; + this.timeData = new ArrayList<>(); + } + + // It should be called first before other methods in sharedTimeBuffer. + public void init(TsFileSequenceReader reader) throws IOException { + if (timeBuffer != null) { + return; + } + Chunk timeChunk = reader.readMemChunk(timeChunkMetaData.getOffsetOfChunkHeader()); + timeChunkHeader = timeChunk.getHeader(); + timeBuffer = timeChunk.getData(); + } + + public long[] getPageTime(int pageId) throws IOException { + int size = timeData.size(); + if (pageId < size) { + return timeData.get(pageId); + } else if (pageId == size) { + loadPageData(); + return timeData.get(pageId); + } else { + throw new UnsupportedOperationException( + "PageId in SharedTimeDataBuffer should be incremental."); + } + } + + private void loadPageData() throws IOException { + if (!timeBuffer.hasRemaining()) { + throw new UnsupportedOperationException("No more data in SharedTimeDataBuffer"); + } + PageHeader timePageHeader = + PageHeader.deserializeFrom(timeBuffer, timeChunkHeader.getDataType()); + ByteBuffer timePageData = + ChunkReader.deserializePageData(timePageHeader, timeBuffer, timeChunkHeader); + long[] pageData = new long[(int) timePageHeader.getNumOfValues()]; + int index = 0; + while (defaultTimeDecoder.hasNext(timePageData)) { + pageData[index] = defaultTimeDecoder.readLong(timePageData); + } + timeData.add(pageData); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/TsFileDeviceStartEndTimeIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/TsFileDeviceStartEndTimeIterator.java new file mode 100644 index 000000000000..c641ec72032b --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/TsFileDeviceStartEndTimeIterator.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.utils; + +import org.apache.iotdb.db.storageengine.dataregion.read.filescan.model.DeviceStartEndTime; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.DeviceTimeIndex; + +import org.apache.tsfile.file.metadata.IDeviceID; + +import java.util.Iterator; + +/** + * This class is used to iterate over the devicesTimeIndex in a TsFile to get the start and end + * times of each device. + */ +public class TsFileDeviceStartEndTimeIterator { + + private final DeviceTimeIndex deviceTimeIndex; + private final Iterator currentDevice; + + public TsFileDeviceStartEndTimeIterator(DeviceTimeIndex deviceTimeIndex) { + this.deviceTimeIndex = deviceTimeIndex; + this.currentDevice = deviceTimeIndex.getDevices().iterator(); + } + + public boolean hasNext() { + return currentDevice.hasNext(); + } + + public DeviceStartEndTime next() { + IDeviceID deviceID = currentDevice.next(); + return new DeviceStartEndTime( + deviceID, deviceTimeIndex.getStartTime(deviceID), deviceTimeIndex.getEndTime(deviceID)); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ModificationUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ModificationUtils.java index 230d33dc7a7e..9e69c6d5cd34 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ModificationUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ModificationUtils.java @@ -19,13 +19,19 @@ package org.apache.iotdb.db.utils; +import org.apache.iotdb.commons.path.AlignedPath; +import org.apache.iotdb.commons.path.MeasurementPath; +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable; import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion; import org.apache.iotdb.db.storageengine.dataregion.modification.Modification; import org.apache.tsfile.file.metadata.AlignedChunkMetadata; import org.apache.tsfile.file.metadata.IChunkMetadata; import org.apache.tsfile.read.common.TimeRange; +import org.apache.tsfile.utils.Pair; +import java.util.ArrayList; import java.util.List; public class ModificationUtils { @@ -140,10 +146,101 @@ public static void modifyAlignedChunkMetaData( }); } + // Check whether the timestamp is deleted in deletionList + // Timestamp and deletionList need to be ordered, and deleteCursor is array whose size is 1 stands + // for the index of the deletionList + public static boolean isPointDeleted( + long timestamp, List deletionList, int[] deleteCursor) { + if (deleteCursor.length != 1) { + throw new IllegalArgumentException("deleteCursor should be an array whose size is 1"); + } + while (deletionList != null && deleteCursor[0] < deletionList.size()) { + if (deletionList.get(deleteCursor[0]).contains(timestamp)) { + return true; + } else if (deletionList.get(deleteCursor[0]).getMax() < timestamp) { + deleteCursor[0]++; + } else { + return false; + } + } + return false; + } + + public static boolean isPointDeleted(long timestamp, List deletionList) { + int[] deleteCursor = {0}; + return isPointDeleted(timestamp, deletionList, deleteCursor); + } + private static void doModifyChunkMetaData(Modification modification, IChunkMetadata metaData) { if (modification instanceof Deletion) { Deletion deletion = (Deletion) modification; metaData.insertIntoSortedDeletions(deletion.getTimeRange()); } } + + /** Methods for modification in memory table */ + public static List> constructDeletionList( + AlignedPath partialPath, + IMemTable memTable, + List> modsToMemtable, + long timeLowerBound) { + List> deletionList = new ArrayList<>(); + for (String measurement : partialPath.getMeasurementList()) { + List columnDeletionList = new ArrayList<>(); + columnDeletionList.add(new TimeRange(Long.MIN_VALUE, timeLowerBound)); + for (Modification modification : + ModificationUtils.getModificationsForMemtable(memTable, modsToMemtable)) { + if (modification instanceof Deletion) { + Deletion deletion = (Deletion) modification; + PartialPath fullPath = partialPath.concatNode(measurement); + if (deletion.getPath().matchFullPath(fullPath) + && deletion.getEndTime() > timeLowerBound) { + long lowerBound = Math.max(deletion.getStartTime(), timeLowerBound); + columnDeletionList.add(new TimeRange(lowerBound, deletion.getEndTime())); + } + } + } + deletionList.add(TimeRange.sortAndMerge(columnDeletionList)); + } + return deletionList; + } + + /** + * construct a deletion list from a memtable. + * + * @param memTable memtable + * @param timeLowerBound time watermark + */ + public static List constructDeletionList( + MeasurementPath partialPath, + IMemTable memTable, + List> modsToMemtable, + long timeLowerBound) { + List deletionList = new ArrayList<>(); + deletionList.add(new TimeRange(Long.MIN_VALUE, timeLowerBound)); + for (Modification modification : getModificationsForMemtable(memTable, modsToMemtable)) { + if (modification instanceof Deletion) { + Deletion deletion = (Deletion) modification; + if (deletion.getPath().matchFullPath(partialPath) + && deletion.getEndTime() > timeLowerBound) { + long lowerBound = Math.max(deletion.getStartTime(), timeLowerBound); + deletionList.add(new TimeRange(lowerBound, deletion.getEndTime())); + } + } + } + return TimeRange.sortAndMerge(deletionList); + } + + private static List getModificationsForMemtable( + IMemTable memTable, List> modsToMemtable) { + List modifications = new ArrayList<>(); + boolean foundMemtable = false; + for (Pair entry : modsToMemtable) { + if (foundMemtable || entry.right.equals(memTable)) { + modifications.add(entry.left); + foundMemtable = true; + } + } + return modifications; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java index fedd841470a8..59fd47091389 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java @@ -47,6 +47,7 @@ import static org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.ARRAY_SIZE; import static org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.TVLIST_SORT_ALGORITHM; import static org.apache.iotdb.db.utils.MemUtils.getBinarySize; +import static org.apache.iotdb.db.utils.ModificationUtils.isPointDeleted; import static org.apache.tsfile.utils.RamUsageEstimator.NUM_BYTES_ARRAY_HEADER; import static org.apache.tsfile.utils.RamUsageEstimator.NUM_BYTES_OBJECT_REF; @@ -939,7 +940,7 @@ public TsBlock buildTsBlock( // value columns for (int columnIndex = 0; columnIndex < dataTypes.size(); columnIndex++) { - int deleteCursor = 0; + int[] deleteCursor = {0}; // Pair of Time and Index Pair lastValidPointIndexForTimeDupCheck = null; if (Objects.nonNull(timeDuplicateInfo)) { @@ -1260,4 +1261,8 @@ public BitMap getRowBitMap() { return new BitMap(rowCount, rowBitsArr); } + + public List> getBitMaps() { + return bitMaps; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java index 8f90296d774e..b11d7f14f54e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java @@ -40,6 +40,7 @@ import static org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.ARRAY_SIZE; import static org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.TVLIST_SORT_ALGORITHM; import static org.apache.iotdb.db.utils.MemUtils.getBinarySize; +import static org.apache.iotdb.db.utils.ModificationUtils.isPointDeleted; public abstract class BinaryTVList extends TVList { // list of primitive array, add 1 when expanded -> Binary primitive array @@ -185,7 +186,7 @@ protected void writeValidValuesIntoTsBlock( int floatPrecision, TSEncoding encoding, List deletionList) { - Integer deleteCursor = 0; + int[] deleteCursor = {0}; for (int i = 0; i < rowCount; i++) { if (!isPointDeleted(getTime(i), deletionList, deleteCursor) && (i == rowCount - 1 || getTime(i) != getTime(i + 1))) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java index 28e1b54dc8eb..00b0c3de8fd6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java @@ -38,6 +38,7 @@ import static org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.ARRAY_SIZE; import static org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.TVLIST_SORT_ALGORITHM; +import static org.apache.iotdb.db.utils.ModificationUtils.isPointDeleted; public abstract class BooleanTVList extends TVList { // list of primitive array, add 1 when expanded -> Binary primitive array @@ -144,7 +145,7 @@ protected void writeValidValuesIntoTsBlock( int floatPrecision, TSEncoding encoding, List deletionList) { - Integer deleteCursor = 0; + int[] deleteCursor = {0}; for (int i = 0; i < rowCount; i++) { if (!isPointDeleted(getTime(i), deletionList, deleteCursor) && (i == rowCount - 1 || getTime(i) != getTime(i + 1))) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java index c72150db27e4..b7cc3336d10a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java @@ -38,6 +38,7 @@ import static org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.ARRAY_SIZE; import static org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.TVLIST_SORT_ALGORITHM; +import static org.apache.iotdb.db.utils.ModificationUtils.isPointDeleted; public abstract class DoubleTVList extends TVList { // list of primitive array, add 1 when expanded -> double primitive array @@ -147,7 +148,7 @@ protected void writeValidValuesIntoTsBlock( int floatPrecision, TSEncoding encoding, List deletionList) { - Integer deleteCursor = 0; + int[] deleteCursor = {0}; for (int i = 0; i < rowCount; i++) { if (!isPointDeleted(getTime(i), deletionList, deleteCursor) && (i == rowCount - 1 || getTime(i) != getTime(i + 1))) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java index 86e2b36d8816..43a208ecccfd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java @@ -38,6 +38,7 @@ import static org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.ARRAY_SIZE; import static org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.TVLIST_SORT_ALGORITHM; +import static org.apache.iotdb.db.utils.ModificationUtils.isPointDeleted; public abstract class FloatTVList extends TVList { // list of primitive array, add 1 when expanded -> float primitive array @@ -147,7 +148,7 @@ protected void writeValidValuesIntoTsBlock( int floatPrecision, TSEncoding encoding, List deletionList) { - Integer deleteCursor = 0; + int[] deleteCursor = {0}; for (int i = 0; i < rowCount; i++) { if (!isPointDeleted(getTime(i), deletionList, deleteCursor) && (i == rowCount - 1 || getTime(i) != getTime(i + 1))) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java index 033e88a9e1cb..162f1fd1112b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java @@ -37,6 +37,7 @@ import static org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.ARRAY_SIZE; import static org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.TVLIST_SORT_ALGORITHM; +import static org.apache.iotdb.db.utils.ModificationUtils.isPointDeleted; public abstract class IntTVList extends TVList { // list of primitive array, add 1 when expanded -> int primitive array @@ -142,7 +143,7 @@ protected void writeValidValuesIntoTsBlock( int floatPrecision, TSEncoding encoding, List deletionList) { - Integer deleteCursor = 0; + int[] deleteCursor = {0}; for (int i = 0; i < rowCount; i++) { if (!isPointDeleted(getTime(i), deletionList, deleteCursor) && (i == rowCount - 1 || getTime(i) != getTime(i + 1))) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java index 4adaa0b978c5..4c8f4ccea9e7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java @@ -37,6 +37,7 @@ import static org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.ARRAY_SIZE; import static org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.TVLIST_SORT_ALGORITHM; +import static org.apache.iotdb.db.utils.ModificationUtils.isPointDeleted; public abstract class LongTVList extends TVList { // list of primitive array, add 1 when expanded -> long primitive array @@ -142,7 +143,7 @@ protected void writeValidValuesIntoTsBlock( int floatPrecision, TSEncoding encoding, List deletionList) { - Integer deleteCursor = 0; + int[] deleteCursor = {0}; for (int i = 0; i < rowCount; i++) { if (!isPointDeleted(getTime(i), deletionList, deleteCursor) && (i == rowCount - 1 || getTime(i) != getTime(i + 1))) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java index 90956eb12b6d..b179f681bafb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java @@ -360,20 +360,6 @@ protected abstract void writeValidValuesIntoTsBlock( TSEncoding encoding, List deletionList); - protected boolean isPointDeleted( - long timestamp, List deletionList, Integer deleteCursor) { - while (deletionList != null && deleteCursor < deletionList.size()) { - if (deletionList.get(deleteCursor).contains(timestamp)) { - return true; - } else if (deletionList.get(deleteCursor).getMax() < timestamp) { - deleteCursor++; - } else { - return false; - } - } - return false; - } - protected float roundValueWithGivenPrecision( float value, int floatPrecision, TSEncoding encoding) { if (!Float.isNaN(value) && (encoding == TSEncoding.RLE || encoding == TSEncoding.TS_2DIFF)) { @@ -414,4 +400,8 @@ public static TVList deserialize(DataInputStream stream) throws IOException { } return null; } + + public List getTimestamps() { + return timestamps; + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/path/PatternTreeMapTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/path/PatternTreeMapTest.java index 92f41526fc2e..a3450f72332b 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/path/PatternTreeMapTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/path/PatternTreeMapTest.java @@ -55,6 +55,11 @@ public void stringAppendPatternTreeMapTest() throws IllegalPathException { patternTreeMap.append(new PartialPath("root.**"), "J"); patternTreeMap.append(new PartialPath("root.**.**"), "K"); + checkOverlappedByDevice( + patternTreeMap, + new PartialPath("root.sg1.d1"), + Arrays.asList("A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K")); + checkOverlapped( patternTreeMap, new PartialPath("root.sg1.d1.s1"), @@ -160,6 +165,47 @@ public void modificationPatternTreeMapTest() throws IllegalPathException { patternTreeMap, new PartialPath("root.sg1.d1.s2"), Collections.singletonList(new Deletion(new PartialPath("root.**"), 5, 10, 100))); + + patternTreeMap.append( + new PartialPath("root.sg1.d2.s1"), + new Deletion(new PartialPath("root.sg1.d2.s1"), 4, 4, 6)); + patternTreeMap.append( + new PartialPath("root.**.s2"), new Deletion(new PartialPath("root.**.s2"), 4, 4, 6)); + patternTreeMap.append( + new PartialPath("root.sg1.d1.s3"), + new Deletion(new PartialPath("root.sg1.d1.s3"), 4, 5, 6)); + patternTreeMap.append( + new PartialPath("root.sg1.d1.*"), new Deletion(new PartialPath("root.sg1.d1.*"), 8, 4, 6)); + patternTreeMap.append( + new PartialPath("root.sg1.d1.*.d3.s5"), + new Deletion(new PartialPath("root.sg1.d1.*.d3.s5"), 2, 4, 6)); + patternTreeMap.append( + new PartialPath("root.sg1.d1.*.d3.s4"), + new Deletion(new PartialPath("root.sg1.d1.*.d3.s4"), 3, 4, 6)); + + checkOverlappedByDevice( + patternTreeMap, + new PartialPath("root.sg1.d1"), + Arrays.asList( + new Deletion(new PartialPath("root.sg1.d1.s1"), 1, 1, 3), + new Deletion(new PartialPath("root.sg1.d1.s1"), 1, 6, 10), + new Deletion(new PartialPath("root.**.s1"), 5, 10, 100), + new Deletion(new PartialPath("root.**.s2"), 4, 4, 6), + new Deletion(new PartialPath("root.sg1.d1.s3"), 4, 5, 6), + new Deletion(new PartialPath("root.**.s1"), 10, 100, 200), + new Deletion(new PartialPath("root.sg1.d1.*"), 8, 4, 6), + new Deletion(new PartialPath("root.**"), 5, 10, 100))); + + checkOverlappedByDevice( + patternTreeMap, + new PartialPath("root.sg1.d1.t1.d3"), + Arrays.asList( + new Deletion(new PartialPath("root.**.s1"), 5, 10, 100), + new Deletion(new PartialPath("root.**.s2"), 4, 4, 6), + new Deletion(new PartialPath("root.**.s1"), 10, 100, 200), + new Deletion(new PartialPath("root.**"), 5, 10, 100), + new Deletion(new PartialPath("root.sg1.d1.*.d3.s5"), 2, 4, 6), + new Deletion(new PartialPath("root.sg1.d1.*.d3.s4"), 3, 4, 6))); } private void checkOverlapped( @@ -187,4 +233,13 @@ private void checkOverlappedByDeviceMeasurements( } } } + + private void checkOverlappedByDevice( + PatternTreeMap patternTreeMap, PartialPath devicePath, List expectedList) { + Set resultSet = new HashSet<>(patternTreeMap.getDeviceOverlapped(devicePath)); + Assert.assertEquals(expectedList.size(), resultSet.size()); + for (T o : expectedList) { + Assert.assertTrue(resultSet.contains(o)); + } + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PatternTreeMap.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PatternTreeMap.java index 130c605c1950..dc35cb5a4c4f 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PatternTreeMap.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PatternTreeMap.java @@ -205,4 +205,45 @@ private void searchOverlapped( searchOverlapped(child, deviceNodes, pos + 1, measurements, resultSet); } } + + /** + * Get a list of value lists related to PathPattern that overlapped with device. + * + *

Attention!: The results may contain imprecise and redundant values. Values that appear in + * the result set are not necessarily belong to current device, but those that do not appear are + * definitely not included. + * + * @param devicePath device path without wildcard + * @return de-duplicated value list + */ + public List getDeviceOverlapped(PartialPath devicePath) { + Set resultSet = new HashSet<>(); + searchDeviceOverlapped(root, devicePath.getNodes(), 0, resultSet); + return new ArrayList<>(resultSet); + } + + /** + * Recursive method for search overlapped pattern for devicePath. + * + * @param node current PathPatternNode + * @param deviceNodes pathNodes of device + * @param pos current index of deviceNodes + * @param resultSet result set + */ + private void searchDeviceOverlapped( + PathPatternNode node, String[] deviceNodes, int pos, Set resultSet) { + if (pos == deviceNodes.length - 1) { + resultSet.addAll(node.getValues()); + for (PathPatternNode child : node.getChildren().values()) { + resultSet.addAll(child.getValues()); + } + return; + } + if (node.isMultiLevelWildcard()) { + searchDeviceOverlapped(node, deviceNodes, pos + 1, resultSet); + } + for (PathPatternNode child : node.getMatchChildren(deviceNodes[pos + 1])) { + searchDeviceOverlapped(child, deviceNodes, pos + 1, resultSet); + } + } } diff --git a/pom.xml b/pom.xml index f3054f579aa4..4f5d319baf94 100644 --- a/pom.xml +++ b/pom.xml @@ -185,7 +185,7 @@ 1.9 0.11.1 1.5.5-5 - 1.0.1-6cbbba7-SNAPSHOT + 1.0.1-ee39f42-SNAPSHOT