From c25c445fc2a512a2e84cf7ba18c6e0a99e420d02 Mon Sep 17 00:00:00 2001 From: Liu Xuxin Date: Thu, 11 Jan 2024 21:40:03 +0800 Subject: [PATCH] add measurement set in insertRecordsReq --- .../thrift/impl/ClientRPCServiceImpl.java | 49 +++++++++---------- .../apache/iotdb/commons/utils/PathUtils.java | 48 ++++-------------- .../src/main/thrift/client.thrift | 1 + 3 files changed, 34 insertions(+), 64 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java index 0ae4f5091a7e..5c9519040aa4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java @@ -19,6 +19,28 @@ package org.apache.iotdb.db.protocol.thrift.impl; +import static org.apache.iotdb.commons.partition.DataPartition.NOT_ASSIGNED; +import static org.apache.iotdb.db.queryengine.common.DataNodeEndPoints.isSameNode; +import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; +import static org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.initTimeRangeIterator; +import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onIoTDBException; +import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNpeOrUnexpectedException; +import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException; +import static org.apache.iotdb.db.utils.QueryDataSetUtils.convertTsBlockByFetchSize; + +import io.airlift.units.Duration; +import io.jsonwebtoken.lang.Strings; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; import org.apache.iotdb.common.rpc.thrift.TAggregationType; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; @@ -176,34 +198,10 @@ import org.apache.iotdb.tsfile.utils.TimeDuration; import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; - -import io.airlift.units.Duration; -import io.jsonwebtoken.lang.Strings; -import org.apache.commons.lang3.StringUtils; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.time.ZoneId; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.TimeUnit; - -import static org.apache.iotdb.commons.partition.DataPartition.NOT_ASSIGNED; -import static org.apache.iotdb.db.queryengine.common.DataNodeEndPoints.isSameNode; -import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; -import static org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.initTimeRangeIterator; -import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onIoTDBException; -import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNpeOrUnexpectedException; -import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException; -import static org.apache.iotdb.db.utils.QueryDataSetUtils.convertTsBlockByFetchSize; - public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { private static final Logger LOGGER = LoggerFactory.getLogger(ClientRPCServiceImpl.class); @@ -1639,7 +1637,8 @@ public TSStatus insertRecords(TSInsertRecordsReq req) { // check whether measurement is legal according to syntax convention req.setMeasurementsList( - PathUtils.checkIsLegalSingleMeasurementsListsAndUpdateForRows(req.getMeasurementsList())); + PathUtils.checkIsLegalSingleMeasurementsListsAndUpdateForRows( + req.getMeasurementsSet(), req.getMeasurementsList())); // Step 1: transfer from TSInsertRecordsReq to Statement InsertRowsStatement statement = StatementGenerator.createStatement(req); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/PathUtils.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/PathUtils.java index 0763fe4b858f..313bb6a2a178 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/PathUtils.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/PathUtils.java @@ -18,6 +18,13 @@ */ package org.apache.iotdb.commons.utils; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.exception.MetadataException; @@ -26,19 +33,9 @@ import org.apache.iotdb.tsfile.read.common.parser.PathNodesGenerator; import org.apache.iotdb.tsfile.read.common.parser.PathVisitor; import org.apache.iotdb.tsfile.utils.Pair; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; - public class PathUtils { private static final Logger log = LoggerFactory.getLogger(PathUtils.class); private static AtomicLong totalTimeCost = new AtomicLong(0); @@ -95,41 +92,14 @@ public static List> checkIsLegalSingleMeasurementListsAndUpdate( } public static List> checkIsLegalSingleMeasurementsListsAndUpdateForRows( - List> measurementLists) throws MetadataException { + Set measurements, List> measurementLists) throws MetadataException { long startTime = System.nanoTime(); if (Objects.isNull(measurementLists)) { return null; } - Set measurementSet = new HashSet<>(); - for (List measurements : measurementLists) { - if (Objects.isNull(measurements)) { - continue; - } - for (String measurement : measurements) { - if (Objects.isNull(measurement)) { - continue; - } - measurementSet.add(measurement); - } - } Map needToUpdate = new HashMap<>(); - checkLegalSingleMeasurementsAndSkipDuplicateForRows(measurementSet, needToUpdate); - for (Map.Entry entry : needToUpdate.entrySet()) { - for (List measurements : measurementLists) { - if (Objects.isNull(measurements)) { - continue; - } - for (int i = 0; i < measurements.size(); i++) { - if (Objects.isNull(measurements.get(i))) { - continue; - } - if (measurements.get(i).equals(entry.getKey())) { - measurements.set(i, entry.getValue()); - } - } - } - } + checkLegalSingleMeasurementsAndSkipDuplicateForRows(measurements, needToUpdate); totalTimeCost.addAndGet(System.nanoTime() - startTime); long count = totalCount.incrementAndGet(); if (count % 1000 == 0) { diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift index 2b2f55857267..0e08ea0fffdb 100644 --- a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift +++ b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift @@ -253,6 +253,7 @@ struct TSInsertRecordsReq { 4: required list valuesList 5: required list timestamps 6: optional bool isAligned + 7: optional set measurementsSet } struct TSInsertRecordsOfOneDeviceReq {