Skip to content

Commit

Permalink
add measurement set in insertRecordsReq
Browse files Browse the repository at this point in the history
  • Loading branch information
THUMarkLau committed Feb 22, 2024
1 parent 4def5b0 commit c25c445
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,28 @@

package org.apache.iotdb.db.protocol.thrift.impl;

import static org.apache.iotdb.commons.partition.DataPartition.NOT_ASSIGNED;
import static org.apache.iotdb.db.queryengine.common.DataNodeEndPoints.isSameNode;
import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
import static org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.initTimeRangeIterator;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onIoTDBException;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNpeOrUnexpectedException;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
import static org.apache.iotdb.db.utils.QueryDataSetUtils.convertTsBlockByFetchSize;

import io.airlift.units.Duration;
import io.jsonwebtoken.lang.Strings;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.iotdb.common.rpc.thrift.TAggregationType;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
Expand Down Expand Up @@ -176,34 +198,10 @@
import org.apache.iotdb.tsfile.utils.TimeDuration;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;

import io.airlift.units.Duration;
import io.jsonwebtoken.lang.Strings;
import org.apache.commons.lang3.StringUtils;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import static org.apache.iotdb.commons.partition.DataPartition.NOT_ASSIGNED;
import static org.apache.iotdb.db.queryengine.common.DataNodeEndPoints.isSameNode;
import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
import static org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.initTimeRangeIterator;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onIoTDBException;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNpeOrUnexpectedException;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
import static org.apache.iotdb.db.utils.QueryDataSetUtils.convertTsBlockByFetchSize;

public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {

private static final Logger LOGGER = LoggerFactory.getLogger(ClientRPCServiceImpl.class);
Expand Down Expand Up @@ -1639,7 +1637,8 @@ public TSStatus insertRecords(TSInsertRecordsReq req) {

// check whether measurement is legal according to syntax convention
req.setMeasurementsList(
PathUtils.checkIsLegalSingleMeasurementsListsAndUpdateForRows(req.getMeasurementsList()));
PathUtils.checkIsLegalSingleMeasurementsListsAndUpdateForRows(
req.getMeasurementsSet(), req.getMeasurementsList()));

// Step 1: transfer from TSInsertRecordsReq to Statement
InsertRowsStatement statement = StatementGenerator.createStatement(req);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@
*/
package org.apache.iotdb.commons.utils;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
Expand All @@ -26,19 +33,9 @@
import org.apache.iotdb.tsfile.read.common.parser.PathNodesGenerator;
import org.apache.iotdb.tsfile.read.common.parser.PathVisitor;
import org.apache.iotdb.tsfile.utils.Pair;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;

public class PathUtils {
private static final Logger log = LoggerFactory.getLogger(PathUtils.class);
private static AtomicLong totalTimeCost = new AtomicLong(0);
Expand Down Expand Up @@ -95,41 +92,14 @@ public static List<List<String>> checkIsLegalSingleMeasurementListsAndUpdate(
}

public static List<List<String>> checkIsLegalSingleMeasurementsListsAndUpdateForRows(
List<List<String>> measurementLists) throws MetadataException {
Set<String> measurements, List<List<String>> measurementLists) throws MetadataException {
long startTime = System.nanoTime();
if (Objects.isNull(measurementLists)) {
return null;
}
Set<String> measurementSet = new HashSet<>();
for (List<String> measurements : measurementLists) {
if (Objects.isNull(measurements)) {
continue;
}
for (String measurement : measurements) {
if (Objects.isNull(measurement)) {
continue;
}
measurementSet.add(measurement);
}
}

Map<String, String> needToUpdate = new HashMap<>();
checkLegalSingleMeasurementsAndSkipDuplicateForRows(measurementSet, needToUpdate);
for (Map.Entry<String, String> entry : needToUpdate.entrySet()) {
for (List<String> measurements : measurementLists) {
if (Objects.isNull(measurements)) {
continue;
}
for (int i = 0; i < measurements.size(); i++) {
if (Objects.isNull(measurements.get(i))) {
continue;
}
if (measurements.get(i).equals(entry.getKey())) {
measurements.set(i, entry.getValue());
}
}
}
}
checkLegalSingleMeasurementsAndSkipDuplicateForRows(measurements, needToUpdate);
totalTimeCost.addAndGet(System.nanoTime() - startTime);
long count = totalCount.incrementAndGet();
if (count % 1000 == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ struct TSInsertRecordsReq {
4: required list<binary> valuesList
5: required list<i64> timestamps
6: optional bool isAligned
7: optional set<string> measurementsSet
}

struct TSInsertRecordsOfOneDeviceReq {
Expand Down

0 comments on commit c25c445

Please sign in to comment.