From a7b15f53bc0d7da9841451f8c8ef4205a230ea81 Mon Sep 17 00:00:00 2001 From: HTHou Date: Wed, 3 Dec 2025 17:38:07 +0800 Subject: [PATCH 01/15] add base32 Object Path --- .../planner/plan/node/write/ObjectNode.java | 43 +++--- .../node/write/RelationalInsertRowsNode.java | 25 +-- .../write/RelationalInsertTabletNode.java | 18 +-- .../dataregion/Base32ObjectPath.java | 146 ++++++++++++++++++ .../storageengine/dataregion/DataRegion.java | 6 +- .../storageengine/dataregion/IObjectPath.java | 52 +++++++ .../tsfile/generator/TsFileNameGenerator.java | 17 -- .../rescon/disk/TierManager.java | 11 -- .../iotdb/db/utils/ObjectTypeUtils.java | 11 +- 9 files changed, 253 insertions(+), 76 deletions(-) create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/Base32ObjectPath.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/IObjectPath.java diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java index 10fb9bc3443a..cef10d612fa9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java @@ -33,8 +33,9 @@ import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryType; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue; -import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils; import org.apache.iotdb.db.storageengine.rescon.disk.TierManager; +import org.apache.iotdb.db.storageengine.dataregion.IObjectPath; +import org.apache.iotdb.db.storageengine.dataregion.IObjectPath.Deserializer; import org.apache.tsfile.utils.PublicBAOS; import org.apache.tsfile.utils.ReadWriteIOUtils; @@ -60,7 +61,7 @@ public class ObjectNode extends SearchNode implements WALEntryValue { private byte[] content; - private String filePath; + private IObjectPath filePath; private final int contentLength; @@ -68,7 +69,7 @@ public class ObjectNode extends SearchNode implements WALEntryValue { private boolean isGeneratedByRemoteConsensusLeader; - public ObjectNode(boolean isEOF, long offset, byte[] content, String filePath) { + public ObjectNode(boolean isEOF, long offset, byte[] content, IObjectPath filePath) { super(new PlanNodeId("")); this.isEOF = isEOF; this.offset = offset; @@ -77,7 +78,7 @@ public ObjectNode(boolean isEOF, long offset, byte[] content, String filePath) { this.contentLength = content.length; } - public ObjectNode(boolean isEOF, long offset, int contentLength, String filePath) { + public ObjectNode(boolean isEOF, long offset, int contentLength, IObjectPath filePath) { super(new PlanNodeId("")); this.isEOF = isEOF; this.offset = offset; @@ -97,12 +98,12 @@ public long getOffset() { return offset; } - public void setFilePath(String filePath) { + public void setFilePath(IObjectPath filePath) { this.filePath = filePath; } - public String getFilePath() { - return filePath; + public String getFilePathString() { + return filePath.toString(); } @Override @@ -111,7 +112,11 @@ public void serializeToWAL(IWALByteBufferView buffer) { buffer.putLong(searchIndex); buffer.put((byte) (isEOF ? 1 : 0)); buffer.putLong(offset); - WALWriteUtils.write(filePath, buffer); + try { + filePath.serialize(buffer); + } catch (IOException e) { + throw new RuntimeException(e); + } buffer.putInt(content.length); } @@ -122,14 +127,14 @@ public int serializedSize() { + Byte.BYTES + Long.BYTES + Integer.BYTES - + ReadWriteIOUtils.sizeToWrite(filePath); + + filePath.getSerializedSize(); } public static ObjectNode deserializeFromWAL(DataInputStream stream) throws IOException { long searchIndex = stream.readLong(); boolean isEOF = stream.readByte() == 1; long offset = stream.readLong(); - String filePath = ReadWriteIOUtils.readString(stream); + IObjectPath filePath = Deserializer.DEFAULT_DESERIALIZER.deserializeFrom(stream); int contentLength = stream.readInt(); ObjectNode objectNode = new ObjectNode(isEOF, offset, contentLength, filePath); objectNode.setSearchIndex(searchIndex); @@ -140,8 +145,9 @@ public static ObjectNode deserializeFromWAL(ByteBuffer buffer) { long searchIndex = buffer.getLong(); boolean isEOF = buffer.get() == 1; long offset = buffer.getLong(); - String filePath = ReadWriteIOUtils.readString(buffer); - Optional objectFile = TierManager.getInstance().getAbsoluteObjectFilePath(filePath); + IObjectPath filePath = Deserializer.DEFAULT_DESERIALIZER.deserializeFrom(buffer); + Optional objectFile = + TierManager.getInstance().getAbsoluteObjectFilePath(filePath.toString()); int contentLength = buffer.getInt(); byte[] contents = new byte[contentLength]; if (objectFile.isPresent()) { @@ -152,7 +158,7 @@ public static ObjectNode deserializeFromWAL(ByteBuffer buffer) { throw new RuntimeException(e); } } else { - throw new ObjectFileNotExist(filePath); + throw new ObjectFileNotExist(filePath.toString()); } ObjectNode objectNode = new ObjectNode(isEOF, offset, contents, filePath); @@ -163,7 +169,7 @@ public static ObjectNode deserializeFromWAL(ByteBuffer buffer) { public static ObjectNode deserialize(ByteBuffer byteBuffer) { boolean isEoF = ReadWriteIOUtils.readBool(byteBuffer); long offset = ReadWriteIOUtils.readLong(byteBuffer); - String filePath = ReadWriteIOUtils.readString(byteBuffer); + IObjectPath filePath = Deserializer.DEFAULT_DESERIALIZER.deserializeFrom(byteBuffer); int contentLength = ReadWriteIOUtils.readInt(byteBuffer); byte[] content = ReadWriteIOUtils.readBytes(byteBuffer, contentLength); return new ObjectNode(isEoF, offset, content, filePath); @@ -227,7 +233,7 @@ protected void serializeAttributes(ByteBuffer byteBuffer) { getType().serialize(byteBuffer); ReadWriteIOUtils.write(isEOF, byteBuffer); ReadWriteIOUtils.write(offset, byteBuffer); - ReadWriteIOUtils.write(filePath, byteBuffer); + filePath.serialize(byteBuffer); ReadWriteIOUtils.write(contentLength, byteBuffer); byteBuffer.put(content); } @@ -237,7 +243,7 @@ protected void serializeAttributes(DataOutputStream stream) throws IOException { getType().serialize(stream); ReadWriteIOUtils.write(isEOF, stream); ReadWriteIOUtils.write(offset, stream); - ReadWriteIOUtils.write(filePath, stream); + filePath.serialize(stream); ReadWriteIOUtils.write(contentLength, stream); stream.write(content); } @@ -251,7 +257,8 @@ public ByteBuffer serialize() { byte[] contents = new byte[contentLength]; boolean readSuccess = false; for (int i = 0; i < 2; i++) { - Optional objectFile = TierManager.getInstance().getAbsoluteObjectFilePath(filePath); + Optional objectFile = + TierManager.getInstance().getAbsoluteObjectFilePath(filePath.toString()); if (objectFile.isPresent()) { try { readContentFromFile(objectFile.get(), contents); @@ -279,7 +286,7 @@ public ByteBuffer serialize() { } ReadWriteIOUtils.write(readSuccess && isEOF, stream); ReadWriteIOUtils.write(offset, stream); - ReadWriteIOUtils.write(filePath, stream); + filePath.serialize(stream); ReadWriteIOUtils.write(contentLength, stream); stream.write(contents); return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java index 83f6bbec63e0..2d457cc9366e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java @@ -27,19 +27,17 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode; -import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator; +import org.apache.iotdb.db.storageengine.dataregion.IObjectPath; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.file.metadata.IDeviceID.Factory; import org.apache.tsfile.utils.Binary; -import org.apache.tsfile.utils.BytesUtils; import org.apache.tsfile.utils.ReadWriteIOUtils; import java.io.DataInputStream; import java.io.IOException; import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -212,22 +210,25 @@ private void handleObjectValue( boolean isEoF = buffer.get() == 1; long offset = buffer.getLong(); byte[] content = ReadWriteIOUtils.readBytes(buffer, buffer.remaining()); - String relativePath = - TsFileNameGenerator.generateObjectFilePath( + IObjectPath relativePath = + IObjectPath.Factory.DEFAULT_FACTORY.create( dataRegionReplicaSet.getRegionId().getId(), insertRowNode.getTime(), insertRowNode.getDeviceID(), insertRowNode.getMeasurements()[j]); ObjectNode objectNode = new ObjectNode(isEoF, offset, content, relativePath); objectNode.setDataRegionReplicaSet(dataRegionReplicaSet); - byte[] filePathBytes = relativePath.getBytes(StandardCharsets.UTF_8); - byte[] valueBytes = new byte[filePathBytes.length + Long.BYTES]; - System.arraycopy( - BytesUtils.longToBytes(offset + content.length), 0, valueBytes, 0, Long.BYTES); - System.arraycopy(filePathBytes, 0, valueBytes, Long.BYTES, filePathBytes.length); - ((Binary) values[j]).setValues(valueBytes); - insertRowNode.setValues(values); writePlanNodeList.add(objectNode); + if (isEoF) { + ByteBuffer valueBytes = + ByteBuffer.allocate(relativePath.getSerializedSize() + Long.BYTES); + valueBytes.putLong(offset + content.length); + relativePath.serialize(valueBytes); + ((Binary) values[j]).setValues(valueBytes.array()); + insertRowNode.setValues(values); + } else { + values[j] = null; + } } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java index e3a114211e16..cc4cdcff01c8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java @@ -32,8 +32,8 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode; import org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceSchemaCache; -import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView; +import org.apache.iotdb.db.storageengine.dataregion.IObjectPath; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.file.metadata.IDeviceID; @@ -41,7 +41,6 @@ import org.apache.tsfile.read.TimeValuePair; import org.apache.tsfile.utils.Binary; import org.apache.tsfile.utils.BitMap; -import org.apache.tsfile.utils.BytesUtils; import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.ReadWriteIOUtils; import org.apache.tsfile.write.schema.MeasurementSchema; @@ -50,7 +49,6 @@ import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -470,19 +468,17 @@ private void handleObjectValue( boolean isEoF = buffer.get() == 1; long offset = buffer.getLong(); byte[] content = ReadWriteIOUtils.readBytes(buffer, buffer.remaining()); - String relativePath = - TsFileNameGenerator.generateObjectFilePath( + IObjectPath relativePath = + IObjectPath.Factory.DEFAULT_FACTORY.create( entry.getKey().getRegionId().getId(), times[j], getDeviceID(j), measurements[column]); ObjectNode objectNode = new ObjectNode(isEoF, offset, content, relativePath); objectNode.setDataRegionReplicaSet(entry.getKey()); result.add(objectNode); if (isEoF) { - byte[] filePathBytes = relativePath.getBytes(StandardCharsets.UTF_8); - byte[] valueBytes = new byte[filePathBytes.length + Long.BYTES]; - System.arraycopy( - BytesUtils.longToBytes(offset + content.length), 0, valueBytes, 0, Long.BYTES); - System.arraycopy(filePathBytes, 0, valueBytes, Long.BYTES, filePathBytes.length); - ((Binary[]) columns[column])[j] = new Binary(valueBytes); + ByteBuffer valueBytes = ByteBuffer.allocate(relativePath.getSerializedSize() + Long.BYTES); + valueBytes.putLong(offset + content.length); + relativePath.serialize(valueBytes); + ((Binary[]) columns[column])[j] = new Binary(valueBytes.array()); } else { ((Binary[]) columns[column])[j] = null; if (bitMaps == null) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/Base32ObjectPath.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/Base32ObjectPath.java new file mode 100644 index 000000000000..728df25e0073 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/Base32ObjectPath.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion; + +import com.google.common.io.BaseEncoding; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.utils.ReadWriteForEncodingUtils; +import org.apache.tsfile.utils.ReadWriteIOUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.nio.file.Path; +import java.nio.file.Paths; + +public class Base32ObjectPath implements IObjectPath { + + Path path; + int serializedSize = -1; + + private static final Deserializer DESERIALIZER = + new Deserializer() { + @Override + public IObjectPath deserializeFrom(ByteBuffer byteBuffer) { + return deserialize(byteBuffer); + } + + @Override + public IObjectPath deserializeFrom(InputStream inputStream) throws IOException { + return deserialize(inputStream); + } + }; + + private static final Factory FACTORY = Base32ObjectPath::new; + + public Base32ObjectPath(String first, String... more) { + path = Paths.get(first, more); + } + + public Base32ObjectPath(int regionId, long time, IDeviceID iDeviceID, String measurement) { + Object[] segments = iDeviceID.getSegments(); + String[] pathSegments = new String[segments.length + 2]; + for (int i = 0; i < segments.length; i++) { + Object segment = segments[i]; + String segmentString = segment == null ? "null" : segment.toString(); + pathSegments[i] = + BaseEncoding.base32() + .omitPadding() + .encode(segmentString.getBytes(StandardCharsets.UTF_8)); + } + pathSegments[pathSegments.length - 2] = + BaseEncoding.base32().omitPadding().encode(measurement.getBytes(StandardCharsets.UTF_8)); + pathSegments[pathSegments.length - 1] = time + ".bin"; + path = Paths.get(String.valueOf(regionId), pathSegments); + } + + @Override + public int serialize(ByteBuffer byteBuffer) { + int cnt = 0; + cnt += ReadWriteForEncodingUtils.writeUnsignedVarInt(path.getNameCount(), byteBuffer); + for (Path segment : path) { + cnt += ReadWriteIOUtils.writeVar(segment.toString(), byteBuffer); + } + return cnt; + } + + @Override + public int serialize(OutputStream outputStream) throws IOException { + int cnt = 0; + cnt += ReadWriteForEncodingUtils.writeUnsignedVarInt(path.getNameCount(), outputStream); + for (Path segment : path) { + cnt += ReadWriteIOUtils.writeVar(segment.toString(), outputStream); + } + return cnt; + } + + @Override + public int getSerializedSize() { + if (serializedSize != -1) { + return serializedSize; + } + int cnt = ReadWriteForEncodingUtils.varIntSize(path.getNameCount()); + for (Path segment : path) { + byte[] bytes = segment.toString().getBytes(StandardCharsets.UTF_8); + cnt += ReadWriteForEncodingUtils.varIntSize(bytes.length); + cnt += bytes.length; + } + serializedSize = cnt; + return cnt; + } + + public static Base32ObjectPath deserialize(ByteBuffer byteBuffer) { + int cnt = ReadWriteForEncodingUtils.readUnsignedVarInt(byteBuffer); + String first = ReadWriteIOUtils.readVarIntString(byteBuffer); + String[] more = new String[cnt - 1]; + + for (int i = 0; i < cnt - 1; ++i) { + more[i] = ReadWriteIOUtils.readVarIntString(byteBuffer); + } + return new Base32ObjectPath(first, more); + } + + public static Base32ObjectPath deserialize(InputStream stream) throws IOException { + int cnt = ReadWriteForEncodingUtils.readUnsignedVarInt(stream); + String first = ReadWriteIOUtils.readVarIntString(stream); + String[] more = new String[cnt - 1]; + + for (int i = 0; i < cnt - 1; ++i) { + more[i] = ReadWriteIOUtils.readVarIntString(stream); + } + + return new Base32ObjectPath(first, more); + } + + @Override + public String toString() { + return path.toString(); + } + + public static Factory getFACTORY() { + return FACTORY; + } + + public static Deserializer getDESERIALIZER() { + return DESERIALIZER; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index 1124e33a7df9..a3427defdbf8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -3582,7 +3582,7 @@ public int compact() { public void writeObject(ObjectNode objectNode) throws Exception { writeLock("writeObject"); try { - String relativeTmpPathString = objectNode.getFilePath() + ".tmp"; + String relativeTmpPathString = objectNode.getFilePathString() + ".tmp"; String objectFileDir = TierManager.getInstance().getNextFolderForObjectFile(); File objectTmpFile = FSFactoryProducer.getFSFactory().getFile(objectFileDir, relativeTmpPathString); @@ -3594,9 +3594,9 @@ public void writeObject(ObjectNode objectNode) throws Exception { } if (objectNode.isEOF()) { File objectFile = - FSFactoryProducer.getFSFactory().getFile(objectFileDir, objectNode.getFilePath()); + FSFactoryProducer.getFSFactory().getFile(objectFileDir, objectNode.getFilePathString()); if (objectFile.exists()) { - String relativeBackPathString = objectNode.getFilePath() + ".back"; + String relativeBackPathString = objectNode.getFilePathString() + ".back"; File objectBackFile = FSFactoryProducer.getFSFactory().getFile(objectFileDir, relativeBackPathString); Files.move( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/IObjectPath.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/IObjectPath.java new file mode 100644 index 000000000000..ee0613a1c85d --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/IObjectPath.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion; + +import org.apache.tsfile.file.metadata.IDeviceID; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +public interface IObjectPath { + + int serialize(ByteBuffer byteBuffer); + + int serialize(OutputStream outputStream) throws IOException; + + int getSerializedSize(); + + interface Factory { + + IObjectPath create(int regionId, long time, IDeviceID iDeviceID, String measurement); + + Factory DEFAULT_FACTORY = Base32ObjectPath.getFACTORY(); + } + + interface Deserializer { + + IObjectPath deserializeFrom(ByteBuffer byteBuffer); + + IObjectPath deserializeFrom(InputStream inputStream) throws IOException; + + Deserializer DEFAULT_DESERIALIZER = Base32ObjectPath.getDESERIALIZER(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/generator/TsFileNameGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/generator/TsFileNameGenerator.java index 8ad353a96ef6..16be82188e9c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/generator/TsFileNameGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/generator/TsFileNameGenerator.java @@ -28,7 +28,6 @@ import org.apache.iotdb.db.storageengine.rescon.disk.TierManager; import org.apache.tsfile.common.constant.TsFileConstant; -import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.fileSystem.FSFactoryProducer; import org.apache.tsfile.fileSystem.fsFactory.FSFactory; import org.slf4j.Logger; @@ -176,22 +175,6 @@ public static TsFileName getTsFileName(String fileName) throws IOException { } } - public static String generateObjectFilePath( - int regionId, long time, IDeviceID iDeviceID, String measurement) { - String objectFileName = time + ".bin"; - Object[] segments = iDeviceID.getSegments(); - StringBuilder relativePathString = - new StringBuilder(String.valueOf(regionId)).append(File.separator); - for (Object segment : segments) { - relativePathString - .append(segment == null ? "null" : segment.toString().toLowerCase()) - .append(File.separator); - } - relativePathString.append(measurement).append(File.separator); - relativePathString.append(objectFileName); - return relativePathString.toString(); - } - @TestOnly public static TsFileResource increaseCrossCompactionCnt(TsFileResource tsFileResource) throws IOException { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/TierManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/TierManager.java index 6355880cebd2..a5fa8b54e7b1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/TierManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/TierManager.java @@ -19,7 +19,6 @@ package org.apache.iotdb.db.storageengine.rescon.disk; import org.apache.iotdb.commons.conf.IoTDBConstant; -import org.apache.iotdb.commons.exception.ObjectFileNotExist; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; @@ -345,16 +344,6 @@ private long[] getTierDiskSpace(DiskSpaceType type) { return tierDiskSpace; } - public File getObjectFile(String relativePath) { - for (String folder : objectDirs) { - File file = new File(folder, relativePath); - if (file.exists()) { - return file; - } - } - throw new ObjectFileNotExist(relativePath); - } - private enum DiskSpaceType { TOTAL, USABLE, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java index c153061a90d6..2c2ce69ac196 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java @@ -19,8 +19,11 @@ package org.apache.iotdb.db.utils; +import java.nio.ByteBuffer; import org.apache.iotdb.commons.exception.ObjectFileNotExist; import org.apache.iotdb.db.service.metrics.FileMetrics; +import org.apache.iotdb.db.storageengine.dataregion.IObjectPath; +import org.apache.iotdb.db.storageengine.dataregion.IObjectPath.Deserializer; import org.apache.iotdb.db.storageengine.rescon.disk.TierManager; import org.apache.tsfile.common.conf.TSFileConfig; @@ -42,8 +45,8 @@ private ObjectTypeUtils() {} public static File getObjectPathFromBinary(Binary binary) { byte[] bytes = binary.getValues(); - String relativeObjectFilePath = - new String(bytes, 8, bytes.length - 8, TSFileConfig.STRING_CHARSET); + ByteBuffer buffer = ByteBuffer.wrap(bytes, 8, bytes.length - 8); + String relativeObjectFilePath = Deserializer.DEFAULT_DESERIALIZER.deserializeFrom(buffer).toString(); Optional file = TIER_MANAGER.getAbsoluteObjectFilePath(relativeObjectFilePath); if (!file.isPresent()) { throw new ObjectFileNotExist(relativeObjectFilePath); @@ -54,8 +57,8 @@ public static File getObjectPathFromBinary(Binary binary) { public static Optional getNullableObjectPathFromBinary( Binary binary, boolean needTempFile) { byte[] bytes = binary.getValues(); - String relativeObjectFilePath = - new String(bytes, 8, bytes.length - 8, TSFileConfig.STRING_CHARSET); + ByteBuffer buffer = ByteBuffer.wrap(bytes, 8, bytes.length - 8); + String relativeObjectFilePath = Deserializer.DEFAULT_DESERIALIZER.deserializeFrom(buffer).toString(); return TIER_MANAGER.getAbsoluteObjectFilePath(relativeObjectFilePath, needTempFile); } From 3e9353efd14eedacc96619f172916202da54150f Mon Sep 17 00:00:00 2001 From: HTHou Date: Wed, 3 Dec 2025 18:01:15 +0800 Subject: [PATCH 02/15] add plainObjectPath and configuration --- .../org/apache/iotdb/db/conf/IoTDBConfig.java | 11 ++ .../planner/plan/node/write/ObjectNode.java | 10 +- .../node/write/RelationalInsertRowsNode.java | 2 +- .../write/RelationalInsertTabletNode.java | 4 +- .../dataregion/Base32ObjectPath.java | 6 +- .../storageengine/dataregion/IObjectPath.java | 15 ++- .../dataregion/PlainObjectPath.java | 106 ++++++++++++++++++ .../iotdb/db/utils/ObjectTypeUtils.java | 10 +- 8 files changed, 146 insertions(+), 18 deletions(-) create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/PlainObjectPath.java diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 375285dfa2fb..9ec3888198e2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -1188,8 +1188,11 @@ public class IoTDBConfig { private ConcurrentHashMap tsFileDBToEncryptMap = new ConcurrentHashMap<>( Collections.singletonMap("root.__audit", new EncryptParameter("UNENCRYPTED", null))); + private long maxObjectSizeInByte = 4 * 1024 * 1024 * 1024L; + private boolean restrictObjectLimit = false; + IoTDBConfig() {} public int getMaxLogEntriesNumPerBatch() { @@ -4269,4 +4272,12 @@ public long getMaxObjectSizeInByte() { public void setMaxObjectSizeInByte(long maxObjectSizeInByte) { this.maxObjectSizeInByte = maxObjectSizeInByte; } + + public boolean getRestrictObjectLimit() { + return restrictObjectLimit; + } + + public void setRestrictObjectLimit(boolean restrictObjectLimit) { + this.restrictObjectLimit = restrictObjectLimit; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java index cef10d612fa9..15c24bd3794e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java @@ -29,13 +29,13 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode; +import org.apache.iotdb.db.storageengine.dataregion.IObjectPath; +import org.apache.iotdb.db.storageengine.dataregion.IObjectPath.Deserializer; import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryType; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue; import org.apache.iotdb.db.storageengine.rescon.disk.TierManager; -import org.apache.iotdb.db.storageengine.dataregion.IObjectPath; -import org.apache.iotdb.db.storageengine.dataregion.IObjectPath.Deserializer; import org.apache.tsfile.utils.PublicBAOS; import org.apache.tsfile.utils.ReadWriteIOUtils; @@ -134,7 +134,7 @@ public static ObjectNode deserializeFromWAL(DataInputStream stream) throws IOExc long searchIndex = stream.readLong(); boolean isEOF = stream.readByte() == 1; long offset = stream.readLong(); - IObjectPath filePath = Deserializer.DEFAULT_DESERIALIZER.deserializeFrom(stream); + IObjectPath filePath = Deserializer.DESERIALIZER.deserializeFrom(stream); int contentLength = stream.readInt(); ObjectNode objectNode = new ObjectNode(isEOF, offset, contentLength, filePath); objectNode.setSearchIndex(searchIndex); @@ -145,7 +145,7 @@ public static ObjectNode deserializeFromWAL(ByteBuffer buffer) { long searchIndex = buffer.getLong(); boolean isEOF = buffer.get() == 1; long offset = buffer.getLong(); - IObjectPath filePath = Deserializer.DEFAULT_DESERIALIZER.deserializeFrom(buffer); + IObjectPath filePath = Deserializer.DESERIALIZER.deserializeFrom(buffer); Optional objectFile = TierManager.getInstance().getAbsoluteObjectFilePath(filePath.toString()); int contentLength = buffer.getInt(); @@ -169,7 +169,7 @@ public static ObjectNode deserializeFromWAL(ByteBuffer buffer) { public static ObjectNode deserialize(ByteBuffer byteBuffer) { boolean isEoF = ReadWriteIOUtils.readBool(byteBuffer); long offset = ReadWriteIOUtils.readLong(byteBuffer); - IObjectPath filePath = Deserializer.DEFAULT_DESERIALIZER.deserializeFrom(byteBuffer); + IObjectPath filePath = Deserializer.DESERIALIZER.deserializeFrom(byteBuffer); int contentLength = ReadWriteIOUtils.readInt(byteBuffer); byte[] content = ReadWriteIOUtils.readBytes(byteBuffer, contentLength); return new ObjectNode(isEoF, offset, content, filePath); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java index 2d457cc9366e..4186c097a5dc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java @@ -211,7 +211,7 @@ private void handleObjectValue( long offset = buffer.getLong(); byte[] content = ReadWriteIOUtils.readBytes(buffer, buffer.remaining()); IObjectPath relativePath = - IObjectPath.Factory.DEFAULT_FACTORY.create( + IObjectPath.Factory.FACTORY.create( dataRegionReplicaSet.getRegionId().getId(), insertRowNode.getTime(), insertRowNode.getDeviceID(), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java index cc4cdcff01c8..89c32c2f593b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java @@ -32,8 +32,8 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode; import org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceSchemaCache; -import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView; import org.apache.iotdb.db.storageengine.dataregion.IObjectPath; +import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.file.metadata.IDeviceID; @@ -469,7 +469,7 @@ private void handleObjectValue( long offset = buffer.getLong(); byte[] content = ReadWriteIOUtils.readBytes(buffer, buffer.remaining()); IObjectPath relativePath = - IObjectPath.Factory.DEFAULT_FACTORY.create( + IObjectPath.Factory.FACTORY.create( entry.getKey().getRegionId().getId(), times[j], getDeviceID(j), measurements[column]); ObjectNode objectNode = new ObjectNode(isEoF, offset, content, relativePath); objectNode.setDataRegionReplicaSet(entry.getKey()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/Base32ObjectPath.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/Base32ObjectPath.java index 728df25e0073..273b1dc40075 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/Base32ObjectPath.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/Base32ObjectPath.java @@ -34,8 +34,8 @@ public class Base32ObjectPath implements IObjectPath { - Path path; - int serializedSize = -1; + private final Path path; + private int serializedSize = -1; private static final Deserializer DESERIALIZER = new Deserializer() { @@ -52,7 +52,7 @@ public IObjectPath deserializeFrom(InputStream inputStream) throws IOException { private static final Factory FACTORY = Base32ObjectPath::new; - public Base32ObjectPath(String first, String... more) { + private Base32ObjectPath(String first, String... more) { path = Paths.get(first, more); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/IObjectPath.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/IObjectPath.java index ee0613a1c85d..76a1f21a306f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/IObjectPath.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/IObjectPath.java @@ -19,6 +19,9 @@ package org.apache.iotdb.db.storageengine.dataregion; +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; + import org.apache.tsfile.file.metadata.IDeviceID; import java.io.IOException; @@ -28,6 +31,8 @@ public interface IObjectPath { + IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig(); + int serialize(ByteBuffer byteBuffer); int serialize(OutputStream outputStream) throws IOException; @@ -38,7 +43,10 @@ interface Factory { IObjectPath create(int regionId, long time, IDeviceID iDeviceID, String measurement); - Factory DEFAULT_FACTORY = Base32ObjectPath.getFACTORY(); + Factory FACTORY = + CONFIG.getRestrictObjectLimit() + ? Base32ObjectPath.getFACTORY() + : PlainObjectPath.getFACTORY(); } interface Deserializer { @@ -47,6 +55,9 @@ interface Deserializer { IObjectPath deserializeFrom(InputStream inputStream) throws IOException; - Deserializer DEFAULT_DESERIALIZER = Base32ObjectPath.getDESERIALIZER(); + Deserializer DESERIALIZER = + CONFIG.getRestrictObjectLimit() + ? Base32ObjectPath.getDESERIALIZER() + : PlainObjectPath.getDESERIALIZER(); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/PlainObjectPath.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/PlainObjectPath.java new file mode 100644 index 000000000000..35bc91a8b08c --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/PlainObjectPath.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion; + +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.utils.ReadWriteIOUtils; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +public class PlainObjectPath implements IObjectPath { + + private final String filePath; + + private static final Deserializer DESERIALIZER = + new Deserializer() { + @Override + public IObjectPath deserializeFrom(ByteBuffer byteBuffer) { + return deserialize(byteBuffer); + } + + @Override + public IObjectPath deserializeFrom(InputStream inputStream) throws IOException { + return deserialize(inputStream); + } + }; + + private static final Factory FACTORY = PlainObjectPath::new; + + private PlainObjectPath(String filePath) { + this.filePath = filePath; + } + + public PlainObjectPath(int regionId, long time, IDeviceID iDeviceID, String measurement) { + String objectFileName = time + ".bin"; + Object[] segments = iDeviceID.getSegments(); + StringBuilder relativePathString = + new StringBuilder(String.valueOf(regionId)).append(File.separator); + for (Object segment : segments) { + relativePathString + .append(segment == null ? "null" : segment.toString().toLowerCase()) + .append(File.separator); + } + relativePathString.append(measurement).append(File.separator); + relativePathString.append(objectFileName); + this.filePath = relativePathString.toString(); + } + + @Override + public int serialize(ByteBuffer byteBuffer) { + return ReadWriteIOUtils.write(filePath, byteBuffer); + } + + @Override + public int serialize(OutputStream outputStream) throws IOException { + return ReadWriteIOUtils.write(filePath, outputStream); + } + + @Override + public int getSerializedSize() { + return ReadWriteIOUtils.sizeToWrite(filePath); + } + + public static PlainObjectPath deserialize(ByteBuffer byteBuffer) { + String filePath = ReadWriteIOUtils.readString(byteBuffer); + return new PlainObjectPath(filePath); + } + + public static PlainObjectPath deserialize(InputStream stream) throws IOException { + String filePath = ReadWriteIOUtils.readString(stream); + return new PlainObjectPath(filePath); + } + + @Override + public String toString() { + return filePath; + } + + public static Factory getFACTORY() { + return FACTORY; + } + + public static Deserializer getDESERIALIZER() { + return DESERIALIZER; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java index 2c2ce69ac196..9a0b5c577951 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java @@ -19,20 +19,18 @@ package org.apache.iotdb.db.utils; -import java.nio.ByteBuffer; import org.apache.iotdb.commons.exception.ObjectFileNotExist; import org.apache.iotdb.db.service.metrics.FileMetrics; import org.apache.iotdb.db.storageengine.dataregion.IObjectPath; -import org.apache.iotdb.db.storageengine.dataregion.IObjectPath.Deserializer; import org.apache.iotdb.db.storageengine.rescon.disk.TierManager; -import org.apache.tsfile.common.conf.TSFileConfig; import org.apache.tsfile.utils.Binary; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; import java.nio.file.Files; import java.util.Optional; @@ -46,7 +44,8 @@ private ObjectTypeUtils() {} public static File getObjectPathFromBinary(Binary binary) { byte[] bytes = binary.getValues(); ByteBuffer buffer = ByteBuffer.wrap(bytes, 8, bytes.length - 8); - String relativeObjectFilePath = Deserializer.DEFAULT_DESERIALIZER.deserializeFrom(buffer).toString(); + String relativeObjectFilePath = + IObjectPath.Deserializer.DESERIALIZER.deserializeFrom(buffer).toString(); Optional file = TIER_MANAGER.getAbsoluteObjectFilePath(relativeObjectFilePath); if (!file.isPresent()) { throw new ObjectFileNotExist(relativeObjectFilePath); @@ -58,7 +57,8 @@ public static Optional getNullableObjectPathFromBinary( Binary binary, boolean needTempFile) { byte[] bytes = binary.getValues(); ByteBuffer buffer = ByteBuffer.wrap(bytes, 8, bytes.length - 8); - String relativeObjectFilePath = Deserializer.DEFAULT_DESERIALIZER.deserializeFrom(buffer).toString(); + String relativeObjectFilePath = + IObjectPath.Deserializer.DESERIALIZER.deserializeFrom(buffer).toString(); return TIER_MANAGER.getAbsoluteObjectFilePath(relativeObjectFilePath, needTempFile); } From caac8151b62208725d394f6bba26dac13e9d471b Mon Sep 17 00:00:00 2001 From: HTHou Date: Wed, 3 Dec 2025 19:02:27 +0800 Subject: [PATCH 03/15] change default configuration --- .../src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 322cf61182eb..9ec3888198e2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -1191,7 +1191,7 @@ public class IoTDBConfig { private long maxObjectSizeInByte = 4 * 1024 * 1024 * 1024L; - private boolean restrictObjectLimit = true; + private boolean restrictObjectLimit = false; IoTDBConfig() {} From 1616d2ab19c37122cad2ba626eea9057e0e9cd55 Mon Sep 17 00:00:00 2001 From: HTHou Date: Fri, 5 Dec 2025 11:09:23 +0800 Subject: [PATCH 04/15] fix UT errors --- .../node/write/RelationalInsertRowsNode.java | 2 +- .../node/write/RelationalInsertTabletNode.java | 2 +- .../dataregion/Base32ObjectPath.java | 10 ++++++++++ .../storageengine/dataregion/IObjectPath.java | 4 ++++ .../dataregion/PlainObjectPath.java | 17 +++++++++++++++++ .../apache/iotdb/db/utils/ObjectTypeUtils.java | 4 ++-- .../plan/function/RecordObjectTypeTest.java | 6 ++++++ .../unary/scalar/ObjectTypeFunctionTest.java | 6 ++++++ .../object/ObjectTypeCompactionTest.java | 6 ++++++ 9 files changed, 53 insertions(+), 4 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java index 15a9c12af20b..2b54c1dcc325 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java @@ -226,7 +226,7 @@ private void handleObjectValue( ByteBuffer valueBytes = ByteBuffer.allocate(relativePath.getSerializedSize() + Long.BYTES); valueBytes.putLong(offset + content.length); - relativePath.serialize(valueBytes); + relativePath.serializeToObjectValue(valueBytes); ((Binary) values[j]).setValues(valueBytes.array()); insertRowNode.setValues(values); } else { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java index 89c32c2f593b..f9fef9ee24d7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java @@ -477,7 +477,7 @@ private void handleObjectValue( if (isEoF) { ByteBuffer valueBytes = ByteBuffer.allocate(relativePath.getSerializedSize() + Long.BYTES); valueBytes.putLong(offset + content.length); - relativePath.serialize(valueBytes); + relativePath.serializeToObjectValue(valueBytes); ((Binary[]) columns[column])[j] = new Binary(valueBytes.array()); } else { ((Binary[]) columns[column])[j] = null; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/Base32ObjectPath.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/Base32ObjectPath.java index 273b1dc40075..a8e180401482 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/Base32ObjectPath.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/Base32ObjectPath.java @@ -48,6 +48,11 @@ public IObjectPath deserializeFrom(ByteBuffer byteBuffer) { public IObjectPath deserializeFrom(InputStream inputStream) throws IOException { return deserialize(inputStream); } + + @Override + public IObjectPath deserializeFromObjectValue(ByteBuffer byteBuffer) { + return deserialize(byteBuffer); + } }; private static final Factory FACTORY = Base32ObjectPath::new; @@ -108,6 +113,11 @@ public int getSerializedSize() { return cnt; } + @Override + public void serializeToObjectValue(ByteBuffer byteBuffer) { + serialize(byteBuffer); + } + public static Base32ObjectPath deserialize(ByteBuffer byteBuffer) { int cnt = ReadWriteForEncodingUtils.readUnsignedVarInt(byteBuffer); String first = ReadWriteIOUtils.readVarIntString(byteBuffer); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/IObjectPath.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/IObjectPath.java index cd16423f4883..bb4110429fff 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/IObjectPath.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/IObjectPath.java @@ -39,6 +39,8 @@ public interface IObjectPath { int getSerializedSize(); + void serializeToObjectValue(ByteBuffer byteBuffer); + interface Factory { IObjectPath create(int regionId, long time, IDeviceID iDeviceID, String measurement); @@ -55,6 +57,8 @@ interface Deserializer { IObjectPath deserializeFrom(InputStream inputStream) throws IOException; + IObjectPath deserializeFromObjectValue(ByteBuffer byteBuffer); + Deserializer DESERIALIZER = CONFIG.getRestrictObjectLimit() ? PlainObjectPath.getDESERIALIZER() diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/PlainObjectPath.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/PlainObjectPath.java index 35bc91a8b08c..7cf3fde4bab2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/PlainObjectPath.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/PlainObjectPath.java @@ -27,6 +27,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; public class PlainObjectPath implements IObjectPath { @@ -43,6 +44,11 @@ public IObjectPath deserializeFrom(ByteBuffer byteBuffer) { public IObjectPath deserializeFrom(InputStream inputStream) throws IOException { return deserialize(inputStream); } + + @Override + public IObjectPath deserializeFromObjectValue(ByteBuffer byteBuffer) { + return deserializeObjectValue(byteBuffer); + } }; private static final Factory FACTORY = PlainObjectPath::new; @@ -81,6 +87,11 @@ public int getSerializedSize() { return ReadWriteIOUtils.sizeToWrite(filePath); } + @Override + public void serializeToObjectValue(ByteBuffer byteBuffer) { + byteBuffer.put(filePath.getBytes(StandardCharsets.UTF_8)); + } + public static PlainObjectPath deserialize(ByteBuffer byteBuffer) { String filePath = ReadWriteIOUtils.readString(byteBuffer); return new PlainObjectPath(filePath); @@ -91,6 +102,12 @@ public static PlainObjectPath deserialize(InputStream stream) throws IOException return new PlainObjectPath(filePath); } + public static PlainObjectPath deserializeObjectValue(ByteBuffer byteBuffer) { + byte[] bytes = new byte[byteBuffer.remaining()]; + byteBuffer.get(bytes); + return new PlainObjectPath(new String(bytes, StandardCharsets.UTF_8)); + } + @Override public String toString() { return filePath; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java index 6e07f9af271f..328a18624a16 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java @@ -169,7 +169,7 @@ public static Pair parseObjectBinary(Binary binary) { ByteBuffer buffer = ByteBuffer.wrap(bytes); long length = buffer.getLong(); String relativeObjectFilePath = - IObjectPath.Deserializer.DESERIALIZER.deserializeFrom(buffer).toString(); + IObjectPath.Deserializer.DESERIALIZER.deserializeFromObjectValue(buffer).toString(); return new Pair<>(length, relativeObjectFilePath); } @@ -184,7 +184,7 @@ public static Optional getNullableObjectPathFromBinary( byte[] bytes = binary.getValues(); ByteBuffer buffer = ByteBuffer.wrap(bytes, 8, bytes.length - 8); String relativeObjectFilePath = - IObjectPath.Deserializer.DESERIALIZER.deserializeFrom(buffer).toString(); + IObjectPath.Deserializer.DESERIALIZER.deserializeFromObjectValue(buffer).toString(); return TIER_MANAGER.getAbsoluteObjectFilePath(relativeObjectFilePath, needTempFile); } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/RecordObjectTypeTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/RecordObjectTypeTest.java index e0a6ca88825e..17c3f13e36b4 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/RecordObjectTypeTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/RecordObjectTypeTest.java @@ -19,6 +19,8 @@ package org.apache.iotdb.db.queryengine.plan.function; +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; import org.apache.iotdb.db.queryengine.execution.operator.process.function.partition.Slice; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.RecordIterator; @@ -50,10 +52,13 @@ public class RecordObjectTypeTest { + private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); + private File objectDir; @Before public void setup() { + config.setRestrictObjectLimit(true); try { objectDir = new File(TierManager.getInstance().getNextFolderForObjectFile()); } catch (DiskSpaceInsufficientException e) { @@ -69,6 +74,7 @@ public void tearDown() throws IOException { Files.delete(file.toPath()); } } + config.setRestrictObjectLimit(true); } @Test diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ObjectTypeFunctionTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ObjectTypeFunctionTest.java index 0a0d73d2ac31..022257d1ee63 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ObjectTypeFunctionTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ObjectTypeFunctionTest.java @@ -19,6 +19,8 @@ package org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar; +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; import org.apache.iotdb.db.queryengine.transformation.dag.column.ColumnTransformer; import org.apache.iotdb.db.storageengine.rescon.disk.TierManager; @@ -48,10 +50,13 @@ public class ObjectTypeFunctionTest { + private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); + private File objectDir; @Before public void setup() { + config.setRestrictObjectLimit(true); try { objectDir = new File(TierManager.getInstance().getNextFolderForObjectFile()); } catch (DiskSpaceInsufficientException e) { @@ -67,6 +72,7 @@ public void tearDown() throws IOException { Files.delete(file.toPath()); } } + config.setRestrictObjectLimit(false); } @Test diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/object/ObjectTypeCompactionTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/object/ObjectTypeCompactionTest.java index 4bc11fbec572..e4f3a50a5fc4 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/object/ObjectTypeCompactionTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/object/ObjectTypeCompactionTest.java @@ -23,6 +23,8 @@ import org.apache.iotdb.commons.schema.table.TsTable; import org.apache.iotdb.commons.schema.table.column.FieldColumnSchema; import org.apache.iotdb.commons.schema.table.column.TagColumnSchema; +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache; @@ -77,10 +79,13 @@ public class ObjectTypeCompactionTest extends AbstractCompactionTest { private String threadName; private File objectDir; + private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); + @Before @Override public void setUp() throws IOException, WriteProcessException, MetadataException, InterruptedException { + config.setRestrictObjectLimit(true); this.threadName = Thread.currentThread().getName(); Thread.currentThread().setName("pool-1-IoTDB-Compaction-Worker-1"); DataNodeTableCache.getInstance().invalid(this.COMPACTION_TEST_SG); @@ -105,6 +110,7 @@ public void tearDown() throws IOException, StorageEngineException { Files.delete(file.toPath()); } } + config.setRestrictObjectLimit(false); } public void createTable(String tableName, long ttl) { From 33aeaee64b8bcac5769368e94345b49e1e00f6ab Mon Sep 17 00:00:00 2001 From: HTHou Date: Fri, 5 Dec 2025 16:00:25 +0800 Subject: [PATCH 05/15] reduce bytes copy --- .../iotdb/db/storageengine/dataregion/PlainObjectPath.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/PlainObjectPath.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/PlainObjectPath.java index 7cf3fde4bab2..ea54bdc22a7e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/PlainObjectPath.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/PlainObjectPath.java @@ -103,9 +103,7 @@ public static PlainObjectPath deserialize(InputStream stream) throws IOException } public static PlainObjectPath deserializeObjectValue(ByteBuffer byteBuffer) { - byte[] bytes = new byte[byteBuffer.remaining()]; - byteBuffer.get(bytes); - return new PlainObjectPath(new String(bytes, StandardCharsets.UTF_8)); + return new PlainObjectPath(StandardCharsets.UTF_8.decode(byteBuffer).toString()); } @Override From 52101673132c31fad702ef529a57a4548c892d07 Mon Sep 17 00:00:00 2001 From: HTHou Date: Mon, 8 Dec 2025 11:33:35 +0800 Subject: [PATCH 06/15] implement unchangeable config --- .../iotdb/confignode/conf/ConfigNodeConfig.java | 10 ++++++++++ .../iotdb/confignode/conf/SystemPropertiesUtils.java | 12 ++++++++++++ .../iotdb/confignode/manager/node/NodeManager.java | 1 + .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 1 + .../src/main/thrift/confignode.thrift | 1 + 5 files changed, 25 insertions(+) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java index 88e8d76001dc..34b69382b8fd 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java @@ -316,6 +316,8 @@ public class ConfigNodeConfig { private long forceWalPeriodForConfigNodeSimpleInMs = 100; + private boolean restrictObjectLimit = false; + public ConfigNodeConfig() { // empty constructor } @@ -1275,4 +1277,12 @@ public long getFailureDetectorPhiAcceptablePauseInMs() { public void setFailureDetectorPhiAcceptablePauseInMs(long failureDetectorPhiAcceptablePauseInMs) { this.failureDetectorPhiAcceptablePauseInMs = failureDetectorPhiAcceptablePauseInMs; } + + public boolean getRestrictObjectLimit() { + return restrictObjectLimit; + } + + public void setRestrictObjectLimit(boolean restrictObjectLimit) { + this.restrictObjectLimit = restrictObjectLimit; + } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java index 529f15d06cd4..9cf6b2dac963 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java @@ -213,6 +213,16 @@ public static void checkSystemProperties() throws IOException { COMMON_CONFIG.setEnableGrantOption(enableGrantOption); } } + + if (systemProperties.getProperty("restrict_object_limit", null) != null) { + boolean restrictObjectLimit = + Boolean.parseBoolean(systemProperties.getProperty("restrict_object_limit")); + if (restrictObjectLimit != conf.getRestrictObjectLimit()) { + LOGGER.warn( + format, "restrict_object_limit", conf.getRestrictObjectLimit(), restrictObjectLimit); + conf.setRestrictObjectLimit(restrictObjectLimit); + } + } } /** @@ -286,6 +296,8 @@ public static void storeSystemParameters() throws IOException { "tag_attribute_total_size", String.valueOf(COMMON_CONFIG.getTagAttributeTotalSize())); systemProperties.setProperty( "enable_grant_option", String.valueOf(COMMON_CONFIG.getEnableGrantOption())); + systemProperties.setProperty( + "restrict_object_limit", String.valueOf(conf.getRestrictObjectLimit())); systemPropertiesHandler.overwrite(systemProperties); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java index e8083b79b68a..234227286df3 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java @@ -179,6 +179,7 @@ private void setGlobalConfig(ConfigurationResp dataSet) { globalConfig.setSchemaEngineMode(commonConfig.getSchemaEngineMode()); globalConfig.setTagAttributeTotalSize(commonConfig.getTagAttributeTotalSize()); globalConfig.setEnableGrantOption(commonConfig.getEnableGrantOption()); + globalConfig.setRestrictObjectLimit(configNodeConfig.getRestrictObjectLimit()); dataSet.setGlobalConfig(globalConfig); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index c194f304e7f6..dac1eb5c3653 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -2717,6 +2717,7 @@ public void loadGlobalConfig(TGlobalConfig globalConfig) { conf.setSeriesPartitionExecutorClass(globalConfig.getSeriesPartitionExecutorClass()); conf.setSeriesPartitionSlotNum(globalConfig.getSeriesPartitionSlotNum()); conf.setReadConsistencyLevel(globalConfig.getReadConsistencyLevel()); + conf.setRestrictObjectLimit(globalConfig.isRestrictObjectLimit()); } public void loadRatisConfig(TRatisConfig ratisConfig) { diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift index d8f6318063eb..e5c222bb56e9 100644 --- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift +++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift @@ -50,6 +50,7 @@ struct TGlobalConfig { 11: optional i32 tagAttributeTotalSize 12: optional bool isEnterprise 13: optional i64 timePartitionOrigin + 14: optional bool restrictObjectLimit } struct TRatisConfig { From 431369a684ea23089acd9a33080b61a7e8965d9e Mon Sep 17 00:00:00 2001 From: HTHou Date: Mon, 8 Dec 2025 15:51:15 +0800 Subject: [PATCH 07/15] Add IT --- .../env/cluster/config/MppCommonConfig.java | 6 + .../cluster/config/MppSharedCommonConfig.java | 7 + .../it/env/cluster/node/DataNodeWrapper.java | 4 + .../env/remote/config/RemoteCommonConfig.java | 5 + .../apache/iotdb/itbase/env/CommonConfig.java | 2 + .../it/session/IoTDBObjectInsertIT.java | 333 ++++++++++++++++++ .../it/session/IoTDBObjectInsertIT2.java | 172 +++++++++ .../it/session/IoTDBSessionRelationalIT.java | 181 ---------- .../confignode/conf/ConfigNodeDescriptor.java | 5 + .../node/write/RelationalInsertRowsNode.java | 2 +- .../write/RelationalInsertTabletNode.java | 6 +- .../dataregion/Base32ObjectPath.java | 5 + .../storageengine/dataregion/IObjectPath.java | 2 + .../dataregion/PlainObjectPath.java | 5 + 14 files changed, 552 insertions(+), 183 deletions(-) create mode 100644 integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBObjectInsertIT.java create mode 100644 integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBObjectInsertIT2.java diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java index 6cb7e0082426..e7f4b228d4f1 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java @@ -651,6 +651,12 @@ public CommonConfig setAuditableOperationResult(String auditableOperationResult) return this; } + @Override + public CommonConfig setRestrictObjectLimit(boolean restrictObjectLimit) { + setProperty("restrict_object_limit", String.valueOf(restrictObjectLimit)); + return this; + } + // For part of the log directory public String getClusterConfigStr() { return fromConsensusFullNameToAbbr(properties.getProperty(CONFIG_NODE_CONSENSUS_PROTOCOL_CLASS)) diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java index 1b4801f56922..abb0f8bf8bb0 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java @@ -684,4 +684,11 @@ public CommonConfig setAuditableOperationResult(String auditableOperationResult) cnConfig.setAuditableOperationResult(auditableOperationResult); return this; } + + @Override + public CommonConfig setRestrictObjectLimit(boolean restrictObjectLimit) { + cnConfig.setRestrictObjectLimit(restrictObjectLimit); + dnConfig.setRestrictObjectLimit(restrictObjectLimit); + return this; + } } diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java index f08c085bc6c4..96a0fbe27e05 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java @@ -169,6 +169,10 @@ public String getSystemDir() { return getDataNodeDir() + File.separator + "system"; } + public String getDataNodeObjectDir() { + return getDataNodeDir() + File.separator + "data" + File.separator + "object"; + } + @Override protected MppJVMConfig initVMConfig() { return MppJVMConfig.builder() diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java index fdcff20dbc85..148046423cd0 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java @@ -477,4 +477,9 @@ public CommonConfig setAuditableOperationLevel(String auditableOperationLevel) { public CommonConfig setAuditableOperationResult(String auditableOperationResult) { return this; } + + @Override + public CommonConfig setRestrictObjectLimit(boolean restrictObjectLimit) { + return this; + } } diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java index f27bdc8d66da..531f94eec4b1 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java @@ -211,4 +211,6 @@ default CommonConfig setDefaultDatabaseLevel(int defaultDatabaseLevel) { CommonConfig setAuditableOperationLevel(String auditableOperationLevel); CommonConfig setAuditableOperationResult(String auditableOperationResult); + + CommonConfig setRestrictObjectLimit(boolean restrictObjectLimit); } diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBObjectInsertIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBObjectInsertIT.java new file mode 100644 index 000000000000..6ee922704a9f --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBObjectInsertIT.java @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.relational.it.session; + +import org.apache.iotdb.isession.ITableSession; +import org.apache.iotdb.isession.SessionDataSet; +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.TableClusterIT; +import org.apache.iotdb.itbase.category.TableLocalStandaloneIT; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.rpc.TSStatusCode; + +import com.google.common.io.BaseEncoding; +import org.apache.tsfile.enums.ColumnCategory; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.BytesUtils; +import org.apache.tsfile.write.record.Tablet; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertNull; + +@RunWith(IoTDBTestRunner.class) +@Category({TableLocalStandaloneIT.class, TableClusterIT.class}) +public class IoTDBObjectInsertIT { + + @BeforeClass + public static void classSetUp() throws Exception { + EnvFactory.getEnv().initClusterEnvironment(); + } + + @Before + public void setUp() throws Exception { + try (ITableSession session = EnvFactory.getEnv().getTableSessionConnection()) { + session.executeNonQueryStatement("CREATE DATABASE IF NOT EXISTS db1"); + } + } + + @After + public void tearDown() throws Exception { + try (ITableSession session = EnvFactory.getEnv().getTableSessionConnection()) { + session.executeNonQueryStatement("DROP DATABASE IF EXISTS db1"); + } + } + + @AfterClass + public static void classTearDown() { + EnvFactory.getEnv().cleanClusterEnvironment(); + } + + @Test + public void insertObjectTest() + throws IoTDBConnectionException, StatementExecutionException, IOException { + String testObject = + System.getProperty("user.dir") + + File.separator + + "target" + + File.separator + + "test-classes" + + File.separator + + "ainode-example" + + File.separator + + "model.pt"; + File object = new File(testObject); + + try (ITableSession session = EnvFactory.getEnv().getTableSessionConnection()) { + session.executeNonQueryStatement("USE \"db1\""); + // insert table data by tablet + List columnNameList = + Arrays.asList("region_id", "plant_id", "device_id", "temperature", "file"); + List dataTypeList = + Arrays.asList( + TSDataType.STRING, + TSDataType.STRING, + TSDataType.STRING, + TSDataType.FLOAT, + TSDataType.OBJECT); + List columnTypeList = + new ArrayList<>( + Arrays.asList( + ColumnCategory.TAG, + ColumnCategory.TAG, + ColumnCategory.TAG, + ColumnCategory.FIELD, + ColumnCategory.FIELD)); + Tablet tablet = new Tablet("object_table", columnNameList, dataTypeList, columnTypeList, 1); + int rowIndex = tablet.getRowSize(); + tablet.addTimestamp(rowIndex, 1); + tablet.addValue(rowIndex, 0, "1"); + tablet.addValue(rowIndex, 1, "5"); + tablet.addValue(rowIndex, 2, "3"); + tablet.addValue(rowIndex, 3, 37.6F); + tablet.addValue(rowIndex, 4, true, 0, Files.readAllBytes(Paths.get(testObject))); + session.insert(tablet); + tablet.reset(); + + try (SessionDataSet dataSet = + session.executeQueryStatement("select file from object_table where time = 1")) { + SessionDataSet.DataIterator iterator = dataSet.iterator(); + while (iterator.next()) { + Assert.assertEquals( + BytesUtils.parseObjectByteArrayToString(BytesUtils.longToBytes(object.length())), + iterator.getString(1)); + } + } + + try (SessionDataSet dataSet = + session.executeQueryStatement( + "select READ_OBJECT(file) from object_table where time = 1")) { + SessionDataSet.DataIterator iterator = dataSet.iterator(); + while (iterator.next()) { + Binary binary = iterator.getBlob(1); + Assert.assertArrayEquals(Files.readAllBytes(Paths.get(testObject)), binary.getValues()); + } + } + } + // test object file path + boolean success = false; + for (DataNodeWrapper dataNodeWrapper : EnvFactory.getEnv().getDataNodeWrapperList()) { + String objectDirStr = dataNodeWrapper.getDataNodeObjectDir(); + File objectDir = new File(objectDirStr); + if (objectDir.exists() && objectDir.isDirectory()) { + File[] regionDirs = objectDir.listFiles(); + if (regionDirs != null) { + for (File regionDir : regionDirs) { + if (regionDir.isDirectory()) { + File objectFile = + new File( + regionDir, + convertPathString("object_table") + + File.separator + + convertPathString("1") + + File.separator + + convertPathString("5") + + File.separator + + convertPathString("3") + + File.separator + + convertPathString("file") + + File.separator + + "1.bin"); + if (objectFile.exists() && objectFile.isFile()) { + success = true; + } + } + } + } + } + } + Assert.assertTrue(success); + } + + @Test + public void insertObjectSegmentsTest() + throws IoTDBConnectionException, StatementExecutionException, IOException { + String testObject = + System.getProperty("user.dir") + + File.separator + + "target" + + File.separator + + "test-classes" + + File.separator + + "ainode-example" + + File.separator + + "model.pt"; + byte[] objectBytes = Files.readAllBytes(Paths.get(testObject)); + List objectSegments = new ArrayList<>(); + for (int i = 0; i < objectBytes.length; i += 512) { + objectSegments.add(Arrays.copyOfRange(objectBytes, i, Math.min(i + 512, objectBytes.length))); + } + + try (ITableSession session = EnvFactory.getEnv().getTableSessionConnection()) { + session.executeNonQueryStatement("USE \"db1\""); + // insert table data by tablet + List columnNameList = + Arrays.asList("region_id", "plant_id", "device_id", "temperature", "file"); + List dataTypeList = + Arrays.asList( + TSDataType.STRING, + TSDataType.STRING, + TSDataType.STRING, + TSDataType.FLOAT, + TSDataType.OBJECT); + List columnTypeList = + new ArrayList<>( + Arrays.asList( + ColumnCategory.TAG, + ColumnCategory.TAG, + ColumnCategory.TAG, + ColumnCategory.FIELD, + ColumnCategory.FIELD)); + Tablet tablet = new Tablet("object_table", columnNameList, dataTypeList, columnTypeList, 1); + for (int i = 0; i < objectSegments.size() - 1; i++) { + int rowIndex = tablet.getRowSize(); + tablet.addTimestamp(rowIndex, 1); + tablet.addValue(rowIndex, 0, "1"); + tablet.addValue(rowIndex, 1, "5"); + tablet.addValue(rowIndex, 2, "3"); + tablet.addValue(rowIndex, 3, 37.6F); + tablet.addValue(rowIndex, 4, false, i * 512L, objectSegments.get(i)); + session.insert(tablet); + tablet.reset(); + } + + try (SessionDataSet dataSet = + session.executeQueryStatement("select file from object_table where time = 1")) { + SessionDataSet.DataIterator iterator = dataSet.iterator(); + while (iterator.next()) { + assertNull(iterator.getString(1)); + } + } + + // insert segment with wrong offset + try { + int rowIndex = tablet.getRowSize(); + tablet.addTimestamp(rowIndex, 1); + tablet.addValue(rowIndex, 0, "1"); + tablet.addValue(rowIndex, 1, "5"); + tablet.addValue(rowIndex, 2, "3"); + tablet.addValue(rowIndex, 3, 37.6F); + tablet.addValue(rowIndex, 4, false, 512L, objectSegments.get(1)); + session.insert(tablet); + } catch (StatementExecutionException e) { + Assert.assertEquals(TSStatusCode.OBJECT_INSERT_ERROR.getStatusCode(), e.getStatusCode()); + Assert.assertEquals( + String.format( + "741: The file length %d is not equal to the offset %d", + ((objectSegments.size() - 1) * 512), 512L), + e.getMessage()); + } finally { + tablet.reset(); + } + + // last segment + int rowIndex = tablet.getRowSize(); + tablet.addTimestamp(rowIndex, 1); + tablet.addValue(rowIndex, 0, "1"); + tablet.addValue(rowIndex, 1, "5"); + tablet.addValue(rowIndex, 2, "3"); + tablet.addValue(rowIndex, 3, 37.6F); + tablet.addValue( + rowIndex, + 4, + true, + (objectSegments.size() - 1) * 512L, + objectSegments.get(objectSegments.size() - 1)); + session.insert(tablet); + tablet.reset(); + + try (SessionDataSet dataSet = + session.executeQueryStatement( + "select READ_OBJECT(file) from object_table where time = 1")) { + SessionDataSet.DataIterator iterator = dataSet.iterator(); + while (iterator.next()) { + Binary binary = iterator.getBlob(1); + Assert.assertArrayEquals(Files.readAllBytes(Paths.get(testObject)), binary.getValues()); + } + } + } + + // test object file path + boolean success = false; + for (DataNodeWrapper dataNodeWrapper : EnvFactory.getEnv().getDataNodeWrapperList()) { + String objectDirStr = dataNodeWrapper.getDataNodeObjectDir(); + File objectDir = new File(objectDirStr); + if (objectDir.exists() && objectDir.isDirectory()) { + File[] regionDirs = objectDir.listFiles(); + if (regionDirs != null) { + for (File regionDir : regionDirs) { + if (regionDir.isDirectory()) { + File objectFile = + new File( + regionDir, + convertPathString("object_table") + + File.separator + + convertPathString("1") + + File.separator + + convertPathString("5") + + File.separator + + convertPathString("3") + + File.separator + + convertPathString("file") + + File.separator + + "1.bin"); + if (objectFile.exists() && objectFile.isFile()) { + success = true; + } + } + } + } + } + } + Assert.assertTrue(success); + } + + protected String convertPathString(String path) { + return BaseEncoding.base32().omitPadding().encode(path.getBytes(StandardCharsets.UTF_8)); + } +} diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBObjectInsertIT2.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBObjectInsertIT2.java new file mode 100644 index 000000000000..f00639b38c54 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBObjectInsertIT2.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.relational.it.session; + +import org.apache.iotdb.db.it.utils.TestUtils; +import org.apache.iotdb.isession.ITableSession; +import org.apache.iotdb.isession.SessionDataSet; +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; +import org.apache.iotdb.itbase.category.TableClusterIT; +import org.apache.iotdb.itbase.category.TableLocalStandaloneIT; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; + +import org.apache.tsfile.enums.ColumnCategory; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.BytesUtils; +import org.apache.tsfile.write.record.Tablet; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +@Category({TableLocalStandaloneIT.class, TableClusterIT.class}) +public class IoTDBObjectInsertIT2 extends IoTDBObjectInsertIT { + + @BeforeClass + public static void classSetUp() throws Exception { + EnvFactory.getEnv().getConfig().getCommonConfig().setRestrictObjectLimit(true); + EnvFactory.getEnv().initClusterEnvironment(); + } + + @AfterClass + public static void classTearDown() { + EnvFactory.getEnv().cleanClusterEnvironment(); + } + + @Test + public void changeRestrictObjectLimitTest() + throws IoTDBConnectionException, StatementExecutionException, IOException { + EnvFactory.getEnv().getConfig().getCommonConfig().setRestrictObjectLimit(false); + TestUtils.restartCluster(EnvFactory.getEnv()); + String testObject = + System.getProperty("user.dir") + + File.separator + + "target" + + File.separator + + "test-classes" + + File.separator + + "ainode-example" + + File.separator + + "model.pt"; + File object = new File(testObject); + + try (ITableSession session = EnvFactory.getEnv().getTableSessionConnection()) { + session.executeNonQueryStatement("USE \"db1\""); + // insert table data by tablet + List columnNameList = + Arrays.asList("region_id", "plant_id", "device_id", "temperature", "file"); + List dataTypeList = + Arrays.asList( + TSDataType.STRING, + TSDataType.STRING, + TSDataType.STRING, + TSDataType.FLOAT, + TSDataType.OBJECT); + List columnTypeList = + new ArrayList<>( + Arrays.asList( + ColumnCategory.TAG, + ColumnCategory.TAG, + ColumnCategory.TAG, + ColumnCategory.FIELD, + ColumnCategory.FIELD)); + Tablet tablet = new Tablet("object_table", columnNameList, dataTypeList, columnTypeList, 1); + int rowIndex = tablet.getRowSize(); + tablet.addTimestamp(rowIndex, 1); + tablet.addValue(rowIndex, 0, "1"); + tablet.addValue(rowIndex, 1, "5"); + tablet.addValue(rowIndex, 2, "3"); + tablet.addValue(rowIndex, 3, 37.6F); + tablet.addValue(rowIndex, 4, true, 0, Files.readAllBytes(Paths.get(testObject))); + session.insert(tablet); + tablet.reset(); + + try (SessionDataSet dataSet = + session.executeQueryStatement("select file from object_table where time = 1")) { + SessionDataSet.DataIterator iterator = dataSet.iterator(); + while (iterator.next()) { + Assert.assertEquals( + BytesUtils.parseObjectByteArrayToString(BytesUtils.longToBytes(object.length())), + iterator.getString(1)); + } + } + + try (SessionDataSet dataSet = + session.executeQueryStatement( + "select READ_OBJECT(file) from object_table where time = 1")) { + SessionDataSet.DataIterator iterator = dataSet.iterator(); + while (iterator.next()) { + Binary binary = iterator.getBlob(1); + Assert.assertArrayEquals(Files.readAllBytes(Paths.get(testObject)), binary.getValues()); + } + } + } + // test object file path + boolean success = false; + for (DataNodeWrapper dataNodeWrapper : EnvFactory.getEnv().getDataNodeWrapperList()) { + String objectDirStr = dataNodeWrapper.getDataNodeObjectDir(); + File objectDir = new File(objectDirStr); + if (objectDir.exists() && objectDir.isDirectory()) { + File[] regionDirs = objectDir.listFiles(); + if (regionDirs != null) { + for (File regionDir : regionDirs) { + if (regionDir.isDirectory()) { + File objectFile = + new File( + regionDir, + convertPathString("object_table") + + File.separator + + convertPathString("1") + + File.separator + + convertPathString("5") + + File.separator + + convertPathString("3") + + File.separator + + convertPathString("file") + + File.separator + + "1.bin"); + if (objectFile.exists() && objectFile.isFile()) { + success = true; + } + } + } + } + } + } + Assert.assertTrue(success); + } + + @Override + protected String convertPathString(String path) { + return path; + } +} diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java index 7f63d1903029..9c02ac94208a 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java @@ -36,7 +36,6 @@ import org.apache.iotdb.itbase.category.TableLocalStandaloneIT; import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.StatementExecutionException; -import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.session.TableSessionBuilder; import org.apache.tsfile.enums.ColumnCategory; @@ -45,7 +44,6 @@ import org.apache.tsfile.file.metadata.TableSchema; import org.apache.tsfile.read.common.RowRecord; import org.apache.tsfile.utils.Binary; -import org.apache.tsfile.utils.BytesUtils; import org.apache.tsfile.write.record.Tablet; import org.apache.tsfile.write.schema.IMeasurementSchema; import org.apache.tsfile.write.schema.MeasurementSchema; @@ -63,8 +61,6 @@ import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Paths; import java.time.LocalDate; import java.util.ArrayList; import java.util.Arrays; @@ -851,183 +847,6 @@ public void insertRelationalTabletWithCacheLeaderTest() } } - @Test - public void insertObjectTest() - throws IoTDBConnectionException, StatementExecutionException, IOException { - String testObject = - System.getProperty("user.dir") - + File.separator - + "target" - + File.separator - + "test-classes" - + File.separator - + "ainode-example" - + File.separator - + "model.pt"; - File object = new File(testObject); - - try (ITableSession session = EnvFactory.getEnv().getTableSessionConnection()) { - session.executeNonQueryStatement("USE \"db1\""); - // insert table data by tablet - List columnNameList = - Arrays.asList("region_id", "plant_id", "device_id", "temperature", "file"); - List dataTypeList = - Arrays.asList( - TSDataType.STRING, - TSDataType.STRING, - TSDataType.STRING, - TSDataType.FLOAT, - TSDataType.OBJECT); - List columnTypeList = - new ArrayList<>( - Arrays.asList( - ColumnCategory.TAG, - ColumnCategory.TAG, - ColumnCategory.TAG, - ColumnCategory.FIELD, - ColumnCategory.FIELD)); - Tablet tablet = new Tablet("object_table", columnNameList, dataTypeList, columnTypeList, 1); - int rowIndex = tablet.getRowSize(); - tablet.addTimestamp(rowIndex, 1); - tablet.addValue(rowIndex, 0, "1"); - tablet.addValue(rowIndex, 1, "5"); - tablet.addValue(rowIndex, 2, "3"); - tablet.addValue(rowIndex, 3, 37.6F); - tablet.addValue(rowIndex, 4, true, 0, Files.readAllBytes(Paths.get(testObject))); - session.insert(tablet); - tablet.reset(); - - try (SessionDataSet dataSet = - session.executeQueryStatement("select file from object_table where time = 1")) { - SessionDataSet.DataIterator iterator = dataSet.iterator(); - while (iterator.next()) { - Assert.assertEquals( - BytesUtils.parseObjectByteArrayToString(BytesUtils.longToBytes(object.length())), - iterator.getString(1)); - } - } - - try (SessionDataSet dataSet = - session.executeQueryStatement( - "select READ_OBJECT(file) from object_table where time = 1")) { - SessionDataSet.DataIterator iterator = dataSet.iterator(); - while (iterator.next()) { - Binary binary = iterator.getBlob(1); - Assert.assertArrayEquals(Files.readAllBytes(Paths.get(testObject)), binary.getValues()); - } - } - } - } - - @Test - public void insertObjectSegmentsTest() - throws IoTDBConnectionException, StatementExecutionException, IOException { - String testObject = - System.getProperty("user.dir") - + File.separator - + "target" - + File.separator - + "test-classes" - + File.separator - + "ainode-example" - + File.separator - + "model.pt"; - byte[] objectBytes = Files.readAllBytes(Paths.get(testObject)); - List objectSegments = new ArrayList<>(); - for (int i = 0; i < objectBytes.length; i += 512) { - objectSegments.add(Arrays.copyOfRange(objectBytes, i, Math.min(i + 512, objectBytes.length))); - } - - try (ITableSession session = EnvFactory.getEnv().getTableSessionConnection()) { - session.executeNonQueryStatement("USE \"db1\""); - // insert table data by tablet - List columnNameList = - Arrays.asList("region_id", "plant_id", "device_id", "temperature", "file"); - List dataTypeList = - Arrays.asList( - TSDataType.STRING, - TSDataType.STRING, - TSDataType.STRING, - TSDataType.FLOAT, - TSDataType.OBJECT); - List columnTypeList = - new ArrayList<>( - Arrays.asList( - ColumnCategory.TAG, - ColumnCategory.TAG, - ColumnCategory.TAG, - ColumnCategory.FIELD, - ColumnCategory.FIELD)); - Tablet tablet = new Tablet("object_table", columnNameList, dataTypeList, columnTypeList, 1); - for (int i = 0; i < objectSegments.size() - 1; i++) { - int rowIndex = tablet.getRowSize(); - tablet.addTimestamp(rowIndex, 1); - tablet.addValue(rowIndex, 0, "1"); - tablet.addValue(rowIndex, 1, "5"); - tablet.addValue(rowIndex, 2, "3"); - tablet.addValue(rowIndex, 3, 37.6F); - tablet.addValue(rowIndex, 4, false, i * 512L, objectSegments.get(i)); - session.insert(tablet); - tablet.reset(); - } - - try (SessionDataSet dataSet = - session.executeQueryStatement("select file from object_table where time = 1")) { - SessionDataSet.DataIterator iterator = dataSet.iterator(); - while (iterator.next()) { - assertNull(iterator.getString(1)); - } - } - - // insert segment with wrong offset - try { - int rowIndex = tablet.getRowSize(); - tablet.addTimestamp(rowIndex, 1); - tablet.addValue(rowIndex, 0, "1"); - tablet.addValue(rowIndex, 1, "5"); - tablet.addValue(rowIndex, 2, "3"); - tablet.addValue(rowIndex, 3, 37.6F); - tablet.addValue(rowIndex, 4, false, 512L, objectSegments.get(1)); - session.insert(tablet); - } catch (StatementExecutionException e) { - Assert.assertEquals(TSStatusCode.OBJECT_INSERT_ERROR.getStatusCode(), e.getStatusCode()); - Assert.assertEquals( - String.format( - "741: The file length %d is not equal to the offset %d", - ((objectSegments.size() - 1) * 512), 512L), - e.getMessage()); - } finally { - tablet.reset(); - } - - // last segment - int rowIndex = tablet.getRowSize(); - tablet.addTimestamp(rowIndex, 1); - tablet.addValue(rowIndex, 0, "1"); - tablet.addValue(rowIndex, 1, "5"); - tablet.addValue(rowIndex, 2, "3"); - tablet.addValue(rowIndex, 3, 37.6F); - tablet.addValue( - rowIndex, - 4, - true, - (objectSegments.size() - 1) * 512L, - objectSegments.get(objectSegments.size() - 1)); - session.insert(tablet); - tablet.reset(); - - try (SessionDataSet dataSet = - session.executeQueryStatement( - "select READ_OBJECT(file) from object_table where time = 1")) { - SessionDataSet.DataIterator iterator = dataSet.iterator(); - while (iterator.next()) { - Binary binary = iterator.getBlob(1); - Assert.assertArrayEquals(Files.readAllBytes(Paths.get(testObject)), binary.getValues()); - } - } - } - } - @Test public void autoCreateNontagColumnTest() throws IoTDBConnectionException, StatementExecutionException { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java index 0ea7a278732e..0d3968d3f7a2 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java @@ -368,6 +368,11 @@ private void loadProperties(TrimProperties properties) throws BadNodeUrlExceptio readConsistencyLevel)); } + conf.setRestrictObjectLimit( + Boolean.parseBoolean( + properties.getProperty( + "restrict_object_limit", String.valueOf(conf.getRestrictObjectLimit())))); + // commons commonDescriptor.loadCommonProps(properties); commonDescriptor.initCommonConfigDir(conf.getSystemDir()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java index 2b54c1dcc325..920b696abe62 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java @@ -224,7 +224,7 @@ private void handleObjectValue( writePlanNodeList.add(objectNode); if (isEoF) { ByteBuffer valueBytes = - ByteBuffer.allocate(relativePath.getSerializedSize() + Long.BYTES); + ByteBuffer.allocate(relativePath.getSerializeSizeToObjectValue() + Long.BYTES); valueBytes.putLong(offset + content.length); relativePath.serializeToObjectValue(valueBytes); ((Binary) values[j]).setValues(valueBytes.array()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java index f9fef9ee24d7..1f6a8011686a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java @@ -463,6 +463,9 @@ private void handleObjectValue( Map.Entry> entry, List result) { for (int j = startRow; j < endRow; j++) { + if (((Binary[]) columns[column])[j] == null) { + continue; + } byte[] binary = ((Binary[]) columns[column])[j].getValues(); ByteBuffer buffer = ByteBuffer.wrap(binary); boolean isEoF = buffer.get() == 1; @@ -475,7 +478,8 @@ private void handleObjectValue( objectNode.setDataRegionReplicaSet(entry.getKey()); result.add(objectNode); if (isEoF) { - ByteBuffer valueBytes = ByteBuffer.allocate(relativePath.getSerializedSize() + Long.BYTES); + ByteBuffer valueBytes = + ByteBuffer.allocate(relativePath.getSerializeSizeToObjectValue() + Long.BYTES); valueBytes.putLong(offset + content.length); relativePath.serializeToObjectValue(valueBytes); ((Binary[]) columns[column])[j] = new Binary(valueBytes.array()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/Base32ObjectPath.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/Base32ObjectPath.java index a8e180401482..c9f2b6b5637d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/Base32ObjectPath.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/Base32ObjectPath.java @@ -118,6 +118,11 @@ public void serializeToObjectValue(ByteBuffer byteBuffer) { serialize(byteBuffer); } + @Override + public int getSerializeSizeToObjectValue() { + return getSerializedSize(); + } + public static Base32ObjectPath deserialize(ByteBuffer byteBuffer) { int cnt = ReadWriteForEncodingUtils.readUnsignedVarInt(byteBuffer); String first = ReadWriteIOUtils.readVarIntString(byteBuffer); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/IObjectPath.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/IObjectPath.java index bb4110429fff..f07aaf9037b6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/IObjectPath.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/IObjectPath.java @@ -41,6 +41,8 @@ public interface IObjectPath { void serializeToObjectValue(ByteBuffer byteBuffer); + int getSerializeSizeToObjectValue(); + interface Factory { IObjectPath create(int regionId, long time, IDeviceID iDeviceID, String measurement); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/PlainObjectPath.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/PlainObjectPath.java index ea54bdc22a7e..77818f9a3909 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/PlainObjectPath.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/PlainObjectPath.java @@ -92,6 +92,11 @@ public void serializeToObjectValue(ByteBuffer byteBuffer) { byteBuffer.put(filePath.getBytes(StandardCharsets.UTF_8)); } + @Override + public int getSerializeSizeToObjectValue() { + return filePath.getBytes(StandardCharsets.UTF_8).length; + } + public static PlainObjectPath deserialize(ByteBuffer byteBuffer) { String filePath = ReadWriteIOUtils.readString(byteBuffer); return new PlainObjectPath(filePath); From 8d14cadd5b7d867386a018cd432ee9d2a7963708 Mon Sep 17 00:00:00 2001 From: HTHou Date: Mon, 8 Dec 2025 15:55:05 +0800 Subject: [PATCH 08/15] add config --- .../assembly/resources/conf/iotdb-system.properties.template | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template index 5a32d6a12314..d64ee261c2c0 100644 --- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template +++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template @@ -1301,6 +1301,11 @@ tier_ttl_in_ms=-1 # Datatype: long max_object_file_size_in_byte=4294967296 +# There are no special restrictions on table names, column names, and device names of the OBJECT type. +# effectiveMode: first_start +# Datatype: boolean +restrict_object_limit=false + #################### ### Compaction Configurations #################### From f2b599e01dc2345a66a0d68ac630803b9aee0efa Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Tue, 9 Dec 2025 17:19:11 +0800 Subject: [PATCH 09/15] replace region id for object binary --- .../execute/utils/CompactionUtils.java | 15 ++++-- .../utils/MultiTsFileDeviceIterator.java | 3 +- .../FastAlignedSeriesCompactionExecutor.java | 3 +- .../reader/CompactionAlignedChunkReader.java | 8 ++-- .../reader/chunk/DiskAlignedChunkLoader.java | 16 ++++++- .../iotdb/db/utils/ObjectTypeUtils.java | 46 +++++++++++++++++++ 6 files changed, 80 insertions(+), 11 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java index 0262eb361700..bac487291671 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java @@ -533,7 +533,8 @@ public static void removeDeletedObjectFiles( TsFileSequenceReader reader, List alignedChunkMetadataList, List timeMods, - List> valueMods) + List> valueMods, + int currentRegionId) throws IOException { if (alignedChunkMetadataList.isEmpty()) { return; @@ -578,7 +579,8 @@ public static void removeDeletedObjectFiles( objectColumnIndexList, timeDeletionIntervalList, objectDeletionIntervalList, - deletionCursors); + deletionCursors, + currentRegionId); } } @@ -589,7 +591,8 @@ private static void removeDeletedObjectFiles( List objectColumnIndexList, List timeDeletions, List> objectDeletions, - int[] deletionCursors) + int[] deletionCursors, + int currentRegionId) throws IOException { Chunk timeChunk = reader.readMemChunk((ChunkMetadata) alignedChunkMetadata.getTimeChunkMetadata()); @@ -612,6 +615,12 @@ private static void removeDeletedObjectFiles( continue; } Chunk chunk = reader.readMemChunk(valueChunkMetadata); + if (chunk != null) { + chunk + .getHeader() + .setReplaceDecoder( + decoder -> ObjectTypeUtils.getReplaceDecoder(decoder, currentRegionId)); + } valueChunks.add(chunk); valuePages.add( chunk == null diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java index 7b99541c006e..c07f5a1ad404 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java @@ -489,7 +489,8 @@ private void applyModificationForAlignedChunkMetadataList( readerMap.get(tsFileResource), alignedChunkMetadataList, Collections.singletonList(ttlDeletion), - modificationForValueColumns.stream().map(v -> emptyList).collect(Collectors.toList())); + modificationForValueColumns.stream().map(v -> emptyList).collect(Collectors.toList()), + tsFileResource.getTsFileID().regionId); } ModificationUtils.modifyAlignedChunkMetaData( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastAlignedSeriesCompactionExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastAlignedSeriesCompactionExecutor.java index 166417a97eaf..7a8e884a4913 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastAlignedSeriesCompactionExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastAlignedSeriesCompactionExecutor.java @@ -284,7 +284,8 @@ protected List getAlignedChunkMetadataList(TsFileR readerCacheMap.get(resource), alignedChunkMetadataList, Collections.singletonList(ttlDeletion), - valueModifications.stream().map(v -> emptyList).collect(Collectors.toList())); + valueModifications.stream().map(v -> emptyList).collect(Collectors.toList()), + resource.getTsFileID().regionId); } // modify aligned chunk metadatas diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/reader/CompactionAlignedChunkReader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/reader/CompactionAlignedChunkReader.java index a94150deca19..3cbc0c2fbae1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/reader/CompactionAlignedChunkReader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/reader/CompactionAlignedChunkReader.java @@ -92,7 +92,7 @@ public IPointReader getPagePointReader( ByteBuffer compressedTimePageData, List compressedValuePageDatas) throws IOException { - return getPontReader( + return getPointReader( timePageHeader, valuePageHeaders, compressedTimePageData, @@ -106,11 +106,11 @@ public IPointReader getBatchedPagePointReader( ByteBuffer compressedTimePageData, List compressedValuePageDatas) throws IOException { - return getPontReader( + return getPointReader( timePageHeader, valuePageHeaders, compressedTimePageData, compressedValuePageDatas, false); } - private IPointReader getPontReader( + private IPointReader getPointReader( PageHeader timePageHeader, List valuePageHeaders, ByteBuffer compressedTimePageData, @@ -146,7 +146,7 @@ private IPointReader getPontReader( valuePageHeaders.get(i), uncompressedPageData, valueType, - Decoder.getDecoderByType(valueChunkHeader.getEncodingType(), valueType)); + valueChunkHeader.calculateDecoderForNonTimeChunk()); valuePageReader.setDeleteIntervalList(valueDeleteIntervalList.get(i)); valuePageReaders.add(valuePageReader); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/DiskAlignedChunkLoader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/DiskAlignedChunkLoader.java index a7c6eb96d429..27883b34e9ce 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/DiskAlignedChunkLoader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/DiskAlignedChunkLoader.java @@ -24,7 +24,9 @@ import org.apache.iotdb.db.storageengine.buffer.ChunkCache; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.db.utils.ObjectTypeUtils; +import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.file.metadata.AbstractAlignedChunkMetadata; import org.apache.tsfile.file.metadata.ChunkMetadata; import org.apache.tsfile.file.metadata.IChunkMetadata; @@ -92,7 +94,7 @@ public IChunkReader getChunkReader(IChunkMetadata chunkMetaData, Filter globalTi context); List valueChunkList = new ArrayList<>(); for (IChunkMetadata valueChunkMetadata : alignedChunkMetadata.getValueChunkMetadataList()) { - valueChunkList.add( + Chunk chunk = valueChunkMetadata == null ? null : ChunkCache.getInstance() @@ -104,7 +106,17 @@ public IChunkReader getChunkReader(IChunkMetadata chunkMetaData, Filter globalTi resource.isClosed()), valueChunkMetadata.getDeleteIntervalList(), valueChunkMetadata.getStatistics(), - context)); + context); + final TsFileID tsFileID = getTsFileID(); + if (chunk != null + && tsFileID.regionId > 0 + && chunkMetaData.getDataType() == TSDataType.OBJECT) { + chunk + .getHeader() + .setReplaceDecoder( + decoder -> ObjectTypeUtils.getReplaceDecoder(decoder, tsFileID.regionId)); + } + valueChunkList.add(chunk); } long t2 = System.nanoTime(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java index 328a18624a16..60d73fd6b57c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java @@ -35,7 +35,10 @@ import org.apache.iotdb.mpp.rpc.thrift.TReadObjectReq; import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.tsfile.encoding.decoder.Decoder; +import org.apache.tsfile.encoding.decoder.DecoderWrapper; import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.BytesUtils; import org.apache.tsfile.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,7 +47,9 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; +import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; import java.util.Collections; @@ -147,6 +152,47 @@ private static ByteBuffer readObjectContentFromRemoteFile( return buffer; } + public static Binary generateObjectBinary(long objectSize, String relativePath) { + byte[] filePathBytes = relativePath.getBytes(StandardCharsets.UTF_8); + byte[] valueBytes = new byte[filePathBytes.length + Long.BYTES]; + System.arraycopy(BytesUtils.longToBytes(objectSize), 0, valueBytes, 0, Long.BYTES); + System.arraycopy(filePathBytes, 0, valueBytes, Long.BYTES, filePathBytes.length); + return new Binary(valueBytes); + } + + public static DecoderWrapper getReplaceDecoder(final Decoder decoder, final int newRegionId) { + return new ObjectRegionIdReplaceDecoder(decoder, newRegionId); + } + + private static class ObjectRegionIdReplaceDecoder extends DecoderWrapper { + + private final int newRegionId; + + public ObjectRegionIdReplaceDecoder(Decoder decoder, int newRegionId) { + super(decoder); + this.newRegionId = newRegionId; + } + + @Override + public Binary readBinary(ByteBuffer buffer) { + Binary originValue = originDecoder.readBinary(buffer); + Pair pair = ObjectTypeUtils.parseObjectBinary(originValue); + try { + Path path = Paths.get(pair.getRight()); + int regionId = Integer.parseInt(path.getName(0).toString()); + if (regionId == newRegionId) { + return originValue; + } + String newPath = pair.getRight().replaceFirst(regionId + "", newRegionId + ""); + return ObjectTypeUtils.generateObjectBinary(pair.getLeft(), newPath); + } catch (NumberFormatException e) { + throw new IoTDBRuntimeException( + "wrong object file path: " + pair.getRight(), + TSStatusCode.OBJECT_READ_ERROR.getStatusCode()); + } + } + } + public static int getActualReadSize(String filePath, long fileSize, long offset, long length) { if (offset >= fileSize) { throw new SemanticException( From a0a716bc2b2377514ec81e7a5043a14a2b4e3205 Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Tue, 9 Dec 2025 18:01:22 +0800 Subject: [PATCH 10/15] fix ut --- .../object/ObjectTypeCompactionTest.java | 36 ++++++++++--------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/object/ObjectTypeCompactionTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/object/ObjectTypeCompactionTest.java index e4f3a50a5fc4..9469fc1b118a 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/object/ObjectTypeCompactionTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/object/ObjectTypeCompactionTest.java @@ -25,6 +25,7 @@ import org.apache.iotdb.commons.schema.table.column.TagColumnSchema; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.commons.utils.FileUtils; import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache; @@ -78,6 +79,7 @@ public class ObjectTypeCompactionTest extends AbstractCompactionTest { private String threadName; private File objectDir; + private File regionDir; private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); @@ -93,6 +95,8 @@ public void setUp() super.setUp(); try { objectDir = new File(TierManager.getInstance().getNextFolderForObjectFile()); + regionDir = new File(objectDir, "0"); + regionDir.mkdirs(); } catch (DiskSpaceInsufficientException e) { throw new RuntimeException(e); } @@ -107,7 +111,7 @@ public void tearDown() throws IOException, StorageEngineException { File[] files = objectDir.listFiles(); if (files != null) { for (File file : files) { - Files.delete(file.toPath()); + FileUtils.deleteFileOrDirectory(file); } } config.setRestrictObjectLimit(false); @@ -126,9 +130,9 @@ public void createTable(String tableName, long ttl) { @Test public void testSeqCompactionWithTTL() throws IOException, WriteProcessException { Pair pair1 = - generateTsFileAndObject(true, System.currentTimeMillis() - 10000); + generateTsFileAndObject(true, System.currentTimeMillis() - 10000, 0); Pair pair2 = - generateTsFileAndObject(true, System.currentTimeMillis() + 1000000); + generateTsFileAndObject(true, System.currentTimeMillis() + 1000000, 100); tsFileManager.add(pair1.getLeft(), true); tsFileManager.add(pair2.getLeft(), true); InnerSpaceCompactionTask task = @@ -147,9 +151,9 @@ public void testSeqCompactionWithTTL() throws IOException, WriteProcessException @Test public void testUnseqCompactionWithTTL() throws IOException, WriteProcessException { Pair pair1 = - generateTsFileAndObject(false, System.currentTimeMillis() + 100000); + generateTsFileAndObject(false, System.currentTimeMillis() + 100000, 1); Pair pair2 = - generateTsFileAndObject(false, System.currentTimeMillis() - 1000000); + generateTsFileAndObject(false, System.currentTimeMillis() - 1000000, 0); tsFileManager.add(pair1.getLeft(), false); tsFileManager.add(pair2.getLeft(), false); InnerSpaceCompactionTask task = @@ -168,9 +172,9 @@ public void testUnseqCompactionWithTTL() throws IOException, WriteProcessExcepti @Test public void testUnseqCompactionWithReadPointWithTTL() throws IOException, WriteProcessException { Pair pair1 = - generateTsFileAndObject(false, System.currentTimeMillis() + 100000); + generateTsFileAndObject(false, System.currentTimeMillis() + 100000, 0); Pair pair2 = - generateTsFileAndObject(false, System.currentTimeMillis() - 1000000); + generateTsFileAndObject(false, System.currentTimeMillis() - 1000000, 0); tsFileManager.add(pair1.getLeft(), false); tsFileManager.add(pair2.getLeft(), false); InnerSpaceCompactionTask task = @@ -189,9 +193,9 @@ public void testUnseqCompactionWithReadPointWithTTL() throws IOException, WriteP @Test public void testCrossCompactionWithTTL() throws IOException, WriteProcessException { Pair pair1 = - generateTsFileAndObject(true, System.currentTimeMillis() + 100000); + generateTsFileAndObject(true, System.currentTimeMillis() + 100000, 1); Pair pair2 = - generateTsFileAndObject(false, System.currentTimeMillis() - 1000000); + generateTsFileAndObject(false, System.currentTimeMillis() - 1000000, 2); tsFileManager.add(pair1.getLeft(), true); tsFileManager.add(pair2.getLeft(), false); CrossSpaceCompactionTask task = @@ -211,9 +215,9 @@ public void testCrossCompactionWithTTL() throws IOException, WriteProcessExcepti @Test public void testSettleCompaction() throws IOException, WriteProcessException { Pair pair1 = - generateTsFileAndObject(true, System.currentTimeMillis() - 10000); + generateTsFileAndObject(true, System.currentTimeMillis() - 10000, 3); Pair pair2 = - generateTsFileAndObject(true, System.currentTimeMillis() + 1000000); + generateTsFileAndObject(true, System.currentTimeMillis() + 1000000, 0); tsFileManager.add(pair1.getLeft(), true); tsFileManager.add(pair2.getLeft(), true); SettleCompactionTask task = @@ -230,19 +234,19 @@ public void testSettleCompaction() throws IOException, WriteProcessException { Assert.assertTrue(pair2.getRight().exists()); } - private Pair generateTsFileAndObject(boolean seq, long timestamp) + private Pair generateTsFileAndObject(boolean seq, long timestamp, int regionIdInTsFile) throws IOException, WriteProcessException { TsFileResource resource = createEmptyFileAndResource(seq); - Path testFile1 = Files.createTempFile(objectDir.toPath(), "test_", ".bin"); + Path testFile1 = Files.createTempFile(regionDir.toPath(), "test_", ".bin"); byte[] content = new byte[100]; for (int i = 0; i < 100; i++) { content[i] = (byte) i; } Files.write(testFile1, content); - String relativePath = testFile1.toFile().getName(); - ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES + relativePath.length()); + String relativePathInTsFile = regionIdInTsFile + File.separator + testFile1.toFile().getName(); + ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES + relativePathInTsFile.length()); buffer.putLong(100L); - buffer.put(BytesUtils.stringToBytes(relativePath)); + buffer.put(BytesUtils.stringToBytes(relativePathInTsFile)); buffer.flip(); IDeviceID deviceID = new StringArrayDeviceID("t1", "d1"); try (TsFileIOWriter writer = new TsFileIOWriter(resource.getTsFile())) { From 9b5d746ee8dc532663b9633f5a4c2034b543807d Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Tue, 9 Dec 2025 18:12:30 +0800 Subject: [PATCH 11/15] spotless --- .../dataregion/read/reader/chunk/DiskChunkLoader.java | 10 ++++++++++ .../compaction/object/ObjectTypeCompactionTest.java | 4 ++-- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/DiskChunkLoader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/DiskChunkLoader.java index be33428ae65f..a2ff24233d9f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/DiskChunkLoader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/DiskChunkLoader.java @@ -24,7 +24,9 @@ import org.apache.iotdb.db.storageengine.buffer.ChunkCache; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.db.utils.ObjectTypeUtils; +import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.file.metadata.ChunkMetadata; import org.apache.tsfile.file.metadata.IChunkMetadata; import org.apache.tsfile.read.common.Chunk; @@ -85,6 +87,14 @@ public IChunkReader getChunkReader(IChunkMetadata chunkMetaData, Filter globalTi chunkMetaData.getStatistics(), context); + final TsFileID tsFileID = getTsFileID(); + if (tsFileID.regionId > 0 && chunkMetaData.getDataType() == TSDataType.OBJECT) { + chunk + .getHeader() + .setReplaceDecoder( + decoder -> ObjectTypeUtils.getReplaceDecoder(decoder, tsFileID.regionId)); + } + long t2 = System.nanoTime(); IChunkReader chunkReader = new ChunkReader(chunk, globalTimeFilter); SeriesScanCostMetricSet.getInstance() diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/object/ObjectTypeCompactionTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/object/ObjectTypeCompactionTest.java index 9469fc1b118a..d5874b70c34e 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/object/ObjectTypeCompactionTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/object/ObjectTypeCompactionTest.java @@ -234,8 +234,8 @@ public void testSettleCompaction() throws IOException, WriteProcessException { Assert.assertTrue(pair2.getRight().exists()); } - private Pair generateTsFileAndObject(boolean seq, long timestamp, int regionIdInTsFile) - throws IOException, WriteProcessException { + private Pair generateTsFileAndObject( + boolean seq, long timestamp, int regionIdInTsFile) throws IOException, WriteProcessException { TsFileResource resource = createEmptyFileAndResource(seq); Path testFile1 = Files.createTempFile(regionDir.toPath(), "test_", ".bin"); byte[] content = new byte[100]; From 7eab1a9be41ad15737ed6cff98439697bc933420 Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Wed, 10 Dec 2025 09:49:24 +0800 Subject: [PATCH 12/15] spotless --- .../dataregion/compaction/object/ObjectTypeCompactionTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/object/ObjectTypeCompactionTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/object/ObjectTypeCompactionTest.java index d5874b70c34e..28200597595c 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/object/ObjectTypeCompactionTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/object/ObjectTypeCompactionTest.java @@ -23,9 +23,9 @@ import org.apache.iotdb.commons.schema.table.TsTable; import org.apache.iotdb.commons.schema.table.column.FieldColumnSchema; import org.apache.iotdb.commons.schema.table.column.TagColumnSchema; +import org.apache.iotdb.commons.utils.FileUtils; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.commons.utils.FileUtils; import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache; From a1acb530eb38cc441fecb6687e43dc5bd3941ee2 Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Wed, 10 Dec 2025 10:01:17 +0800 Subject: [PATCH 13/15] fix rebase --- .../planner/plan/node/write/ObjectNode.java | 7 +- .../storageengine/dataregion/IObjectPath.java | 9 ++- .../iotdb/db/utils/ObjectTypeUtils.java | 80 +++++++++++++------ .../object/ObjectTypeCompactionTest.java | 43 ++++++++++ 4 files changed, 107 insertions(+), 32 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java index 15c24bd3794e..4ec37e46bf25 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java @@ -30,7 +30,6 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode; import org.apache.iotdb.db.storageengine.dataregion.IObjectPath; -import org.apache.iotdb.db.storageengine.dataregion.IObjectPath.Deserializer; import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryType; @@ -134,7 +133,7 @@ public static ObjectNode deserializeFromWAL(DataInputStream stream) throws IOExc long searchIndex = stream.readLong(); boolean isEOF = stream.readByte() == 1; long offset = stream.readLong(); - IObjectPath filePath = Deserializer.DESERIALIZER.deserializeFrom(stream); + IObjectPath filePath = IObjectPath.getDeserializer().deserializeFrom(stream); int contentLength = stream.readInt(); ObjectNode objectNode = new ObjectNode(isEOF, offset, contentLength, filePath); objectNode.setSearchIndex(searchIndex); @@ -145,7 +144,7 @@ public static ObjectNode deserializeFromWAL(ByteBuffer buffer) { long searchIndex = buffer.getLong(); boolean isEOF = buffer.get() == 1; long offset = buffer.getLong(); - IObjectPath filePath = Deserializer.DESERIALIZER.deserializeFrom(buffer); + IObjectPath filePath = IObjectPath.getDeserializer().deserializeFrom(buffer); Optional objectFile = TierManager.getInstance().getAbsoluteObjectFilePath(filePath.toString()); int contentLength = buffer.getInt(); @@ -169,7 +168,7 @@ public static ObjectNode deserializeFromWAL(ByteBuffer buffer) { public static ObjectNode deserialize(ByteBuffer byteBuffer) { boolean isEoF = ReadWriteIOUtils.readBool(byteBuffer); long offset = ReadWriteIOUtils.readLong(byteBuffer); - IObjectPath filePath = Deserializer.DESERIALIZER.deserializeFrom(byteBuffer); + IObjectPath filePath = IObjectPath.getDeserializer().deserializeFrom(byteBuffer); int contentLength = ReadWriteIOUtils.readInt(byteBuffer); byte[] content = ReadWriteIOUtils.readBytes(byteBuffer, contentLength); return new ObjectNode(isEoF, offset, content, filePath); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/IObjectPath.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/IObjectPath.java index f07aaf9037b6..c340aae440c0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/IObjectPath.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/IObjectPath.java @@ -60,10 +60,11 @@ interface Deserializer { IObjectPath deserializeFrom(InputStream inputStream) throws IOException; IObjectPath deserializeFromObjectValue(ByteBuffer byteBuffer); + } - Deserializer DESERIALIZER = - CONFIG.getRestrictObjectLimit() - ? PlainObjectPath.getDESERIALIZER() - : Base32ObjectPath.getDESERIALIZER(); + static Deserializer getDeserializer() { + return CONFIG.getRestrictObjectLimit() + ? PlainObjectPath.getDESERIALIZER() + : Base32ObjectPath.getDESERIALIZER(); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java index 60d73fd6b57c..562af848a303 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java @@ -30,7 +30,9 @@ import org.apache.iotdb.db.queryengine.plan.Coordinator; import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher; import org.apache.iotdb.db.service.metrics.FileMetrics; +import org.apache.iotdb.db.storageengine.dataregion.Base32ObjectPath; import org.apache.iotdb.db.storageengine.dataregion.IObjectPath; +import org.apache.iotdb.db.storageengine.dataregion.PlainObjectPath; import org.apache.iotdb.db.storageengine.rescon.disk.TierManager; import org.apache.iotdb.mpp.rpc.thrift.TReadObjectReq; import org.apache.iotdb.rpc.TSStatusCode; @@ -38,8 +40,8 @@ import org.apache.tsfile.encoding.decoder.Decoder; import org.apache.tsfile.encoding.decoder.DecoderWrapper; import org.apache.tsfile.utils.Binary; -import org.apache.tsfile.utils.BytesUtils; import org.apache.tsfile.utils.Pair; +import org.apache.tsfile.utils.ReadWriteIOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,7 +49,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; -import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; @@ -65,7 +66,8 @@ private ObjectTypeUtils() {} public static ByteBuffer readObjectContent( Binary binary, long offset, int length, boolean mayNotInCurrentNode) { - Pair objectLengthPathPair = ObjectTypeUtils.parseObjectBinary(binary); + Pair objectLengthPathPair = + ObjectTypeUtils.parseObjectBinaryToSizeStringPathPair(binary); long fileLength = objectLengthPathPair.getLeft(); String relativePath = objectLengthPathPair.getRight(); int actualReadSize = @@ -152,12 +154,12 @@ private static ByteBuffer readObjectContentFromRemoteFile( return buffer; } - public static Binary generateObjectBinary(long objectSize, String relativePath) { - byte[] filePathBytes = relativePath.getBytes(StandardCharsets.UTF_8); - byte[] valueBytes = new byte[filePathBytes.length + Long.BYTES]; - System.arraycopy(BytesUtils.longToBytes(objectSize), 0, valueBytes, 0, Long.BYTES); - System.arraycopy(filePathBytes, 0, valueBytes, Long.BYTES, filePathBytes.length); - return new Binary(valueBytes); + public static Binary generateObjectBinary(long objectSize, IObjectPath objectPath) { + byte[] valueBytes = new byte[objectPath.getSerializeSizeToObjectValue() + Long.BYTES]; + ByteBuffer buffer = ByteBuffer.wrap(valueBytes); + ReadWriteIOUtils.write(objectSize, buffer); + objectPath.serializeToObjectValue(buffer); + return new Binary(buffer.array()); } public static DecoderWrapper getReplaceDecoder(final Decoder decoder, final int newRegionId) { @@ -176,20 +178,42 @@ public ObjectRegionIdReplaceDecoder(Decoder decoder, int newRegionId) { @Override public Binary readBinary(ByteBuffer buffer) { Binary originValue = originDecoder.readBinary(buffer); - Pair pair = ObjectTypeUtils.parseObjectBinary(originValue); - try { - Path path = Paths.get(pair.getRight()); - int regionId = Integer.parseInt(path.getName(0).toString()); - if (regionId == newRegionId) { - return originValue; + return ObjectTypeUtils.replaceRegionIdForObjectBinary(newRegionId, originValue); + } + } + + public static Binary replaceRegionIdForObjectBinary(int newRegionId, Binary originValue) { + Pair pair = + ObjectTypeUtils.parseObjectBinaryToSizeIObjectPathPair(originValue); + IObjectPath objectPath = pair.getRight(); + try { + Path path; + if (objectPath instanceof PlainObjectPath) { + path = Paths.get(objectPath.toString()); + } else { + path = ((Base32ObjectPath) objectPath).getPath(); + } + int regionId = Integer.parseInt(path.getName(0).toString()); + if (regionId == newRegionId) { + return originValue; + } + IObjectPath newObjectPath; + if (objectPath instanceof PlainObjectPath) { + String newPath = objectPath.toString().replaceFirst(regionId + "", newRegionId + ""); + newObjectPath = new PlainObjectPath(newPath); + } else { + String[] subPath = new String[path.getNameCount() - 1]; + for (int i = 1; i < path.getNameCount(); i++) { + subPath[i - 1] = path.getName(i).toString(); } - String newPath = pair.getRight().replaceFirst(regionId + "", newRegionId + ""); - return ObjectTypeUtils.generateObjectBinary(pair.getLeft(), newPath); - } catch (NumberFormatException e) { - throw new IoTDBRuntimeException( - "wrong object file path: " + pair.getRight(), - TSStatusCode.OBJECT_READ_ERROR.getStatusCode()); + Path newPath = Paths.get(newRegionId + "", subPath); + newObjectPath = new Base32ObjectPath(newPath); } + return ObjectTypeUtils.generateObjectBinary(pair.getLeft(), newObjectPath); + } catch (NumberFormatException e) { + throw new IoTDBRuntimeException( + "wrong object file path: " + pair.getRight(), + TSStatusCode.OBJECT_READ_ERROR.getStatusCode()); } } @@ -210,15 +234,23 @@ public static int getActualReadSize(String filePath, long fileSize, long offset, return (int) actualReadSize; } - public static Pair parseObjectBinary(Binary binary) { + public static Pair parseObjectBinaryToSizeStringPathPair(Binary binary) { byte[] bytes = binary.getValues(); ByteBuffer buffer = ByteBuffer.wrap(bytes); long length = buffer.getLong(); String relativeObjectFilePath = - IObjectPath.Deserializer.DESERIALIZER.deserializeFromObjectValue(buffer).toString(); + IObjectPath.getDeserializer().deserializeFromObjectValue(buffer).toString(); return new Pair<>(length, relativeObjectFilePath); } + public static Pair parseObjectBinaryToSizeIObjectPathPair(Binary binary) { + byte[] bytes = binary.getValues(); + ByteBuffer buffer = ByteBuffer.wrap(bytes); + long length = buffer.getLong(); + IObjectPath objectPath = IObjectPath.getDeserializer().deserializeFromObjectValue(buffer); + return new Pair<>(length, objectPath); + } + public static long getObjectLength(Binary binary) { byte[] bytes = binary.getValues(); ByteBuffer wrap = ByteBuffer.wrap(bytes); @@ -230,7 +262,7 @@ public static Optional getNullableObjectPathFromBinary( byte[] bytes = binary.getValues(); ByteBuffer buffer = ByteBuffer.wrap(bytes, 8, bytes.length - 8); String relativeObjectFilePath = - IObjectPath.Deserializer.DESERIALIZER.deserializeFromObjectValue(buffer).toString(); + IObjectPath.getDeserializer().deserializeFromObjectValue(buffer).toString(); return TIER_MANAGER.getAbsoluteObjectFilePath(relativeObjectFilePath, needTempFile); } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/object/ObjectTypeCompactionTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/object/ObjectTypeCompactionTest.java index 28200597595c..4fd7e8c432d9 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/object/ObjectTypeCompactionTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/object/ObjectTypeCompactionTest.java @@ -29,6 +29,9 @@ import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache; +import org.apache.iotdb.db.storageengine.dataregion.Base32ObjectPath; +import org.apache.iotdb.db.storageengine.dataregion.IObjectPath; +import org.apache.iotdb.db.storageengine.dataregion.PlainObjectPath; import org.apache.iotdb.db.storageengine.dataregion.compaction.AbstractCompactionTest; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadChunkCompactionPerformer; @@ -39,6 +42,7 @@ import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; import org.apache.iotdb.db.storageengine.rescon.disk.TierManager; +import org.apache.iotdb.db.utils.ObjectTypeUtils; import org.apache.tsfile.enums.ColumnCategory; import org.apache.tsfile.enums.TSDataType; @@ -234,6 +238,45 @@ public void testSettleCompaction() throws IOException, WriteProcessException { Assert.assertTrue(pair2.getRight().exists()); } + @Test + public void testPlainObjectBinaryReplaceRegionId() { + IObjectPath objectPath = new PlainObjectPath(1, 0, new StringArrayDeviceID("t1.d1"), "s1"); + ByteBuffer buffer = + ByteBuffer.allocate(Long.BYTES + objectPath.getSerializeSizeToObjectValue()); + buffer.putLong(10); + objectPath.serializeToObjectValue(buffer); + + Binary origin = new Binary(buffer.array()); + Binary result = ObjectTypeUtils.replaceRegionIdForObjectBinary(10, origin); + ByteBuffer deserializeBuffer = ByteBuffer.wrap(result.getValues()); + deserializeBuffer.getLong(); + Assert.assertEquals( + new PlainObjectPath(10, 0, new StringArrayDeviceID("t1.d1"), "s1").toString(), + IObjectPath.getDeserializer().deserializeFromObjectValue(deserializeBuffer).toString()); + } + + @Test + public void testBase32ObjectBinaryReplaceRegionId() { + config.setRestrictObjectLimit(false); + try { + IObjectPath objectPath = new Base32ObjectPath(1, 0, new StringArrayDeviceID("t1.d1"), "s1"); + ByteBuffer buffer = + ByteBuffer.allocate(Long.BYTES + objectPath.getSerializeSizeToObjectValue()); + buffer.putLong(10); + objectPath.serializeToObjectValue(buffer); + + Binary origin = new Binary(buffer.array()); + Binary result = ObjectTypeUtils.replaceRegionIdForObjectBinary(10, origin); + ByteBuffer deserializeBuffer = ByteBuffer.wrap(result.getValues()); + deserializeBuffer.getLong(); + Assert.assertEquals( + new Base32ObjectPath(10, 0, new StringArrayDeviceID("t1.d1"), "s1").toString(), + IObjectPath.getDeserializer().deserializeFromObjectValue(deserializeBuffer).toString()); + } finally { + config.setRestrictObjectLimit(true); + } + } + private Pair generateTsFileAndObject( boolean seq, long timestamp, int regionIdInTsFile) throws IOException, WriteProcessException { TsFileResource resource = createEmptyFileAndResource(seq); From 9820095420b38ec7da500aaa2a1ac68637fbc27a Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Wed, 10 Dec 2025 10:10:10 +0800 Subject: [PATCH 14/15] fix rebase --- .../column/unary/scalar/ReadObjectColumnTransformer.java | 3 ++- .../db/storageengine/dataregion/Base32ObjectPath.java | 8 ++++++++ .../db/storageengine/dataregion/PlainObjectPath.java | 2 +- .../queryengine/plan/function/RecordObjectTypeTest.java | 3 ++- 4 files changed, 13 insertions(+), 3 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ReadObjectColumnTransformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ReadObjectColumnTransformer.java index 3049a9bc4415..a4ad4e25756a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ReadObjectColumnTransformer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ReadObjectColumnTransformer.java @@ -100,7 +100,8 @@ private void transform(Column column, ColumnBuilder columnBuilder, int i) { } private Binary readObject(Binary binary) { - Pair objectLengthPathPair = ObjectTypeUtils.parseObjectBinary(binary); + Pair objectLengthPathPair = + ObjectTypeUtils.parseObjectBinaryToSizeStringPathPair(binary); long fileLength = objectLengthPathPair.getLeft(); String relativePath = objectLengthPathPair.getRight(); int actualReadSize = diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/Base32ObjectPath.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/Base32ObjectPath.java index c9f2b6b5637d..d0ea395502a9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/Base32ObjectPath.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/Base32ObjectPath.java @@ -61,6 +61,10 @@ private Base32ObjectPath(String first, String... more) { path = Paths.get(first, more); } + public Base32ObjectPath(Path path) { + this.path = path; + } + public Base32ObjectPath(int regionId, long time, IDeviceID iDeviceID, String measurement) { Object[] segments = iDeviceID.getSegments(); String[] pathSegments = new String[segments.length + 2]; @@ -151,6 +155,10 @@ public String toString() { return path.toString(); } + public Path getPath() { + return path; + } + public static Factory getFACTORY() { return FACTORY; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/PlainObjectPath.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/PlainObjectPath.java index 77818f9a3909..3ae3a925a3c9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/PlainObjectPath.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/PlainObjectPath.java @@ -53,7 +53,7 @@ public IObjectPath deserializeFromObjectValue(ByteBuffer byteBuffer) { private static final Factory FACTORY = PlainObjectPath::new; - private PlainObjectPath(String filePath) { + public PlainObjectPath(String filePath) { this.filePath = filePath; } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/RecordObjectTypeTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/RecordObjectTypeTest.java index 17c3f13e36b4..1c44cb875430 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/RecordObjectTypeTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/RecordObjectTypeTest.java @@ -114,7 +114,8 @@ private void testRecordIterator(Iterator recordIterator) { Object object = record.getObject(0); Assert.assertTrue(object instanceof Binary); - Pair pair = ObjectTypeUtils.parseObjectBinary((Binary) object); + Pair pair = + ObjectTypeUtils.parseObjectBinaryToSizeStringPathPair((Binary) object); Assert.assertEquals(Long.valueOf(100L), pair.getLeft()); Assert.assertTrue(pair.getRight().startsWith("test_") && pair.getRight().endsWith(".bin")); From f6e68bdee3b56adef37c1cc5145c83a6f096ea06 Mon Sep 17 00:00:00 2001 From: HTHou Date: Mon, 15 Dec 2025 14:25:05 +0800 Subject: [PATCH 15/15] Fix review --- .../plan/node/write/RelationalInsertRowsNode.java | 9 ++++----- .../plan/node/write/RelationalInsertTabletNode.java | 9 ++++----- .../queryengine/plan/function/RecordObjectTypeTest.java | 2 +- 3 files changed, 9 insertions(+), 11 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java index 920b696abe62..6eeb3d7322fe 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java @@ -43,6 +43,8 @@ import java.util.List; import java.util.Map; +import static org.apache.iotdb.db.utils.ObjectTypeUtils.generateObjectBinary; + public class RelationalInsertRowsNode extends InsertRowsNode { // deviceId cache for Table-view insertion private IDeviceID[] deviceIDs; @@ -223,11 +225,8 @@ private void handleObjectValue( objectNode.setDataRegionReplicaSet(dataRegionReplicaSet); writePlanNodeList.add(objectNode); if (isEoF) { - ByteBuffer valueBytes = - ByteBuffer.allocate(relativePath.getSerializeSizeToObjectValue() + Long.BYTES); - valueBytes.putLong(offset + content.length); - relativePath.serializeToObjectValue(valueBytes); - ((Binary) values[j]).setValues(valueBytes.array()); + ((Binary) values[j]) + .setValues(generateObjectBinary(offset + content.length, relativePath).getValues()); insertRowNode.setValues(values); } else { values[j] = null; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java index 1f6a8011686a..3255c11f1e96 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java @@ -55,6 +55,8 @@ import java.util.Map; import java.util.Map.Entry; +import static org.apache.iotdb.db.utils.ObjectTypeUtils.generateObjectBinary; + public class RelationalInsertTabletNode extends InsertTabletNode { // deviceId cache for Table-view insertion @@ -478,11 +480,8 @@ private void handleObjectValue( objectNode.setDataRegionReplicaSet(entry.getKey()); result.add(objectNode); if (isEoF) { - ByteBuffer valueBytes = - ByteBuffer.allocate(relativePath.getSerializeSizeToObjectValue() + Long.BYTES); - valueBytes.putLong(offset + content.length); - relativePath.serializeToObjectValue(valueBytes); - ((Binary[]) columns[column])[j] = new Binary(valueBytes.array()); + ((Binary[]) columns[column])[j] = + generateObjectBinary(offset + content.length, relativePath); } else { ((Binary[]) columns[column])[j] = null; if (bitMaps == null) { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/RecordObjectTypeTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/RecordObjectTypeTest.java index bc5ee088a070..98f2428e8d58 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/RecordObjectTypeTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/RecordObjectTypeTest.java @@ -78,7 +78,7 @@ public void tearDown() throws IOException { Files.delete(file.toPath()); } } - config.setRestrictObjectLimit(true); + config.setRestrictObjectLimit(false); } @Test