diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java index 6cb7e0082426..e7f4b228d4f1 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java @@ -651,6 +651,12 @@ public CommonConfig setAuditableOperationResult(String auditableOperationResult) return this; } + @Override + public CommonConfig setRestrictObjectLimit(boolean restrictObjectLimit) { + setProperty("restrict_object_limit", String.valueOf(restrictObjectLimit)); + return this; + } + // For part of the log directory public String getClusterConfigStr() { return fromConsensusFullNameToAbbr(properties.getProperty(CONFIG_NODE_CONSENSUS_PROTOCOL_CLASS)) diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java index 1b4801f56922..abb0f8bf8bb0 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java @@ -684,4 +684,11 @@ public CommonConfig setAuditableOperationResult(String auditableOperationResult) cnConfig.setAuditableOperationResult(auditableOperationResult); return this; } + + @Override + public CommonConfig setRestrictObjectLimit(boolean restrictObjectLimit) { + cnConfig.setRestrictObjectLimit(restrictObjectLimit); + dnConfig.setRestrictObjectLimit(restrictObjectLimit); + return this; + } } diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java index f08c085bc6c4..96a0fbe27e05 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java @@ -169,6 +169,10 @@ public String getSystemDir() { return getDataNodeDir() + File.separator + "system"; } + public String getDataNodeObjectDir() { + return getDataNodeDir() + File.separator + "data" + File.separator + "object"; + } + @Override protected MppJVMConfig initVMConfig() { return MppJVMConfig.builder() diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java index fdcff20dbc85..148046423cd0 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java @@ -477,4 +477,9 @@ public CommonConfig setAuditableOperationLevel(String auditableOperationLevel) { public CommonConfig setAuditableOperationResult(String auditableOperationResult) { return this; } + + @Override + public CommonConfig setRestrictObjectLimit(boolean restrictObjectLimit) { + return this; + } } diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java index f27bdc8d66da..531f94eec4b1 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java @@ -211,4 +211,6 @@ default CommonConfig setDefaultDatabaseLevel(int defaultDatabaseLevel) { CommonConfig setAuditableOperationLevel(String auditableOperationLevel); CommonConfig setAuditableOperationResult(String auditableOperationResult); + + CommonConfig setRestrictObjectLimit(boolean restrictObjectLimit); } diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBObjectInsertIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBObjectInsertIT.java new file mode 100644 index 000000000000..08946bf4cceb --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBObjectInsertIT.java @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.relational.it.session; + +import org.apache.iotdb.isession.ITableSession; +import org.apache.iotdb.isession.SessionDataSet; +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.TableClusterIT; +import org.apache.iotdb.itbase.category.TableLocalStandaloneIT; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.rpc.TSStatusCode; + +import com.google.common.io.BaseEncoding; +import org.apache.tsfile.enums.ColumnCategory; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.BytesUtils; +import org.apache.tsfile.write.record.Tablet; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertNull; + +@RunWith(IoTDBTestRunner.class) +@Category({TableLocalStandaloneIT.class, TableClusterIT.class}) +public class IoTDBObjectInsertIT { + + @BeforeClass + public static void classSetUp() throws Exception { + EnvFactory.getEnv().initClusterEnvironment(); + } + + @Before + public void setUp() throws Exception { + try (ITableSession session = EnvFactory.getEnv().getTableSessionConnection()) { + session.executeNonQueryStatement("CREATE DATABASE IF NOT EXISTS db1"); + } + } + + @After + public void tearDown() throws Exception { + try (ITableSession session = EnvFactory.getEnv().getTableSessionConnection()) { + session.executeNonQueryStatement("DROP DATABASE IF EXISTS db1"); + } + } + + @AfterClass + public static void classTearDown() { + EnvFactory.getEnv().cleanClusterEnvironment(); + } + + @Test + public void insertObjectTest() + throws IoTDBConnectionException, StatementExecutionException, IOException { + String testObject = + System.getProperty("user.dir") + + File.separator + + "target" + + File.separator + + "test-classes" + + File.separator + + "object-example.pt"; + File object = new File(testObject); + + try (ITableSession session = EnvFactory.getEnv().getTableSessionConnection()) { + session.executeNonQueryStatement("USE \"db1\""); + // insert table data by tablet + List columnNameList = + Arrays.asList("region_id", "plant_id", "device_id", "temperature", "file"); + List dataTypeList = + Arrays.asList( + TSDataType.STRING, + TSDataType.STRING, + TSDataType.STRING, + TSDataType.FLOAT, + TSDataType.OBJECT); + List columnTypeList = + new ArrayList<>( + Arrays.asList( + ColumnCategory.TAG, + ColumnCategory.TAG, + ColumnCategory.TAG, + ColumnCategory.FIELD, + ColumnCategory.FIELD)); + Tablet tablet = new Tablet("object_table", columnNameList, dataTypeList, columnTypeList, 1); + int rowIndex = tablet.getRowSize(); + tablet.addTimestamp(rowIndex, 1); + tablet.addValue(rowIndex, 0, "1"); + tablet.addValue(rowIndex, 1, "5"); + tablet.addValue(rowIndex, 2, "3"); + tablet.addValue(rowIndex, 3, 37.6F); + tablet.addValue(rowIndex, 4, true, 0, Files.readAllBytes(Paths.get(testObject))); + session.insert(tablet); + tablet.reset(); + + try (SessionDataSet dataSet = + session.executeQueryStatement("select file from object_table where time = 1")) { + SessionDataSet.DataIterator iterator = dataSet.iterator(); + while (iterator.next()) { + Assert.assertEquals( + BytesUtils.parseObjectByteArrayToString(BytesUtils.longToBytes(object.length())), + iterator.getString(1)); + } + } + + try (SessionDataSet dataSet = + session.executeQueryStatement( + "select READ_OBJECT(file) from object_table where time = 1")) { + SessionDataSet.DataIterator iterator = dataSet.iterator(); + while (iterator.next()) { + Binary binary = iterator.getBlob(1); + Assert.assertArrayEquals(Files.readAllBytes(Paths.get(testObject)), binary.getValues()); + } + } + } + // test object file path + boolean success = false; + for (DataNodeWrapper dataNodeWrapper : EnvFactory.getEnv().getDataNodeWrapperList()) { + String objectDirStr = dataNodeWrapper.getDataNodeObjectDir(); + File objectDir = new File(objectDirStr); + if (objectDir.exists() && objectDir.isDirectory()) { + File[] regionDirs = objectDir.listFiles(); + if (regionDirs != null) { + for (File regionDir : regionDirs) { + if (regionDir.isDirectory()) { + File objectFile = + new File( + regionDir, + convertPathString("object_table") + + File.separator + + convertPathString("1") + + File.separator + + convertPathString("5") + + File.separator + + convertPathString("3") + + File.separator + + convertPathString("file") + + File.separator + + "1.bin"); + if (objectFile.exists() && objectFile.isFile()) { + success = true; + } + } + } + } + } + } + Assert.assertTrue(success); + } + + @Test + public void insertObjectSegmentsTest() + throws IoTDBConnectionException, StatementExecutionException, IOException { + String testObject = + System.getProperty("user.dir") + + File.separator + + "target" + + File.separator + + "test-classes" + + File.separator + + "object-example.pt"; + byte[] objectBytes = Files.readAllBytes(Paths.get(testObject)); + List objectSegments = new ArrayList<>(); + for (int i = 0; i < objectBytes.length; i += 512) { + objectSegments.add(Arrays.copyOfRange(objectBytes, i, Math.min(i + 512, objectBytes.length))); + } + + try (ITableSession session = EnvFactory.getEnv().getTableSessionConnection()) { + session.executeNonQueryStatement("USE \"db1\""); + // insert table data by tablet + List columnNameList = + Arrays.asList("region_id", "plant_id", "device_id", "temperature", "file"); + List dataTypeList = + Arrays.asList( + TSDataType.STRING, + TSDataType.STRING, + TSDataType.STRING, + TSDataType.FLOAT, + TSDataType.OBJECT); + List columnTypeList = + new ArrayList<>( + Arrays.asList( + ColumnCategory.TAG, + ColumnCategory.TAG, + ColumnCategory.TAG, + ColumnCategory.FIELD, + ColumnCategory.FIELD)); + Tablet tablet = new Tablet("object_table", columnNameList, dataTypeList, columnTypeList, 1); + for (int i = 0; i < objectSegments.size() - 1; i++) { + int rowIndex = tablet.getRowSize(); + tablet.addTimestamp(rowIndex, 1); + tablet.addValue(rowIndex, 0, "1"); + tablet.addValue(rowIndex, 1, "5"); + tablet.addValue(rowIndex, 2, "3"); + tablet.addValue(rowIndex, 3, 37.6F); + tablet.addValue(rowIndex, 4, false, i * 512L, objectSegments.get(i)); + session.insert(tablet); + tablet.reset(); + } + + try (SessionDataSet dataSet = + session.executeQueryStatement("select file from object_table where time = 1")) { + SessionDataSet.DataIterator iterator = dataSet.iterator(); + while (iterator.next()) { + assertNull(iterator.getString(1)); + } + } + + // insert segment with wrong offset + try { + int rowIndex = tablet.getRowSize(); + tablet.addTimestamp(rowIndex, 1); + tablet.addValue(rowIndex, 0, "1"); + tablet.addValue(rowIndex, 1, "5"); + tablet.addValue(rowIndex, 2, "3"); + tablet.addValue(rowIndex, 3, 37.6F); + tablet.addValue(rowIndex, 4, false, 512L, objectSegments.get(1)); + session.insert(tablet); + } catch (StatementExecutionException e) { + Assert.assertEquals(TSStatusCode.OBJECT_INSERT_ERROR.getStatusCode(), e.getStatusCode()); + Assert.assertEquals( + String.format( + "741: The file length %d is not equal to the offset %d", + ((objectSegments.size() - 1) * 512), 512L), + e.getMessage()); + } finally { + tablet.reset(); + } + + // last segment + int rowIndex = tablet.getRowSize(); + tablet.addTimestamp(rowIndex, 1); + tablet.addValue(rowIndex, 0, "1"); + tablet.addValue(rowIndex, 1, "5"); + tablet.addValue(rowIndex, 2, "3"); + tablet.addValue(rowIndex, 3, 37.6F); + tablet.addValue( + rowIndex, + 4, + true, + (objectSegments.size() - 1) * 512L, + objectSegments.get(objectSegments.size() - 1)); + session.insert(tablet); + tablet.reset(); + + try (SessionDataSet dataSet = + session.executeQueryStatement( + "select READ_OBJECT(file) from object_table where time = 1")) { + SessionDataSet.DataIterator iterator = dataSet.iterator(); + while (iterator.next()) { + Binary binary = iterator.getBlob(1); + Assert.assertArrayEquals(Files.readAllBytes(Paths.get(testObject)), binary.getValues()); + } + } + } + + // test object file path + boolean success = false; + for (DataNodeWrapper dataNodeWrapper : EnvFactory.getEnv().getDataNodeWrapperList()) { + String objectDirStr = dataNodeWrapper.getDataNodeObjectDir(); + File objectDir = new File(objectDirStr); + if (objectDir.exists() && objectDir.isDirectory()) { + File[] regionDirs = objectDir.listFiles(); + if (regionDirs != null) { + for (File regionDir : regionDirs) { + if (regionDir.isDirectory()) { + File objectFile = + new File( + regionDir, + convertPathString("object_table") + + File.separator + + convertPathString("1") + + File.separator + + convertPathString("5") + + File.separator + + convertPathString("3") + + File.separator + + convertPathString("file") + + File.separator + + "1.bin"); + if (objectFile.exists() && objectFile.isFile()) { + success = true; + } + } + } + } + } + } + Assert.assertTrue(success); + } + + protected String convertPathString(String path) { + return BaseEncoding.base32().omitPadding().encode(path.getBytes(StandardCharsets.UTF_8)); + } +} diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBObjectInsertIT2.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBObjectInsertIT2.java new file mode 100644 index 000000000000..9bd16938f563 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBObjectInsertIT2.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.relational.it.session; + +import org.apache.iotdb.db.it.utils.TestUtils; +import org.apache.iotdb.isession.ITableSession; +import org.apache.iotdb.isession.SessionDataSet; +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; +import org.apache.iotdb.itbase.category.TableClusterIT; +import org.apache.iotdb.itbase.category.TableLocalStandaloneIT; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; + +import org.apache.tsfile.enums.ColumnCategory; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.BytesUtils; +import org.apache.tsfile.write.record.Tablet; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +@Category({TableLocalStandaloneIT.class, TableClusterIT.class}) +public class IoTDBObjectInsertIT2 extends IoTDBObjectInsertIT { + + @BeforeClass + public static void classSetUp() throws Exception { + EnvFactory.getEnv().getConfig().getCommonConfig().setRestrictObjectLimit(true); + EnvFactory.getEnv().initClusterEnvironment(); + } + + @AfterClass + public static void classTearDown() { + EnvFactory.getEnv().cleanClusterEnvironment(); + } + + @Test + public void changeRestrictObjectLimitTest() + throws IoTDBConnectionException, StatementExecutionException, IOException { + EnvFactory.getEnv().getConfig().getCommonConfig().setRestrictObjectLimit(false); + TestUtils.restartCluster(EnvFactory.getEnv()); + String testObject = + System.getProperty("user.dir") + + File.separator + + "target" + + File.separator + + "test-classes" + + File.separator + + "object-example.pt"; + File object = new File(testObject); + + try (ITableSession session = EnvFactory.getEnv().getTableSessionConnection()) { + session.executeNonQueryStatement("USE \"db1\""); + // insert table data by tablet + List columnNameList = + Arrays.asList("region_id", "plant_id", "device_id", "temperature", "file"); + List dataTypeList = + Arrays.asList( + TSDataType.STRING, + TSDataType.STRING, + TSDataType.STRING, + TSDataType.FLOAT, + TSDataType.OBJECT); + List columnTypeList = + new ArrayList<>( + Arrays.asList( + ColumnCategory.TAG, + ColumnCategory.TAG, + ColumnCategory.TAG, + ColumnCategory.FIELD, + ColumnCategory.FIELD)); + Tablet tablet = new Tablet("object_table", columnNameList, dataTypeList, columnTypeList, 1); + int rowIndex = tablet.getRowSize(); + tablet.addTimestamp(rowIndex, 1); + tablet.addValue(rowIndex, 0, "1"); + tablet.addValue(rowIndex, 1, "5"); + tablet.addValue(rowIndex, 2, "3"); + tablet.addValue(rowIndex, 3, 37.6F); + tablet.addValue(rowIndex, 4, true, 0, Files.readAllBytes(Paths.get(testObject))); + session.insert(tablet); + tablet.reset(); + + try (SessionDataSet dataSet = + session.executeQueryStatement("select file from object_table where time = 1")) { + SessionDataSet.DataIterator iterator = dataSet.iterator(); + while (iterator.next()) { + Assert.assertEquals( + BytesUtils.parseObjectByteArrayToString(BytesUtils.longToBytes(object.length())), + iterator.getString(1)); + } + } + + try (SessionDataSet dataSet = + session.executeQueryStatement( + "select READ_OBJECT(file) from object_table where time = 1")) { + SessionDataSet.DataIterator iterator = dataSet.iterator(); + while (iterator.next()) { + Binary binary = iterator.getBlob(1); + Assert.assertArrayEquals(Files.readAllBytes(Paths.get(testObject)), binary.getValues()); + } + } + } + // test object file path + boolean success = false; + for (DataNodeWrapper dataNodeWrapper : EnvFactory.getEnv().getDataNodeWrapperList()) { + String objectDirStr = dataNodeWrapper.getDataNodeObjectDir(); + File objectDir = new File(objectDirStr); + if (objectDir.exists() && objectDir.isDirectory()) { + File[] regionDirs = objectDir.listFiles(); + if (regionDirs != null) { + for (File regionDir : regionDirs) { + if (regionDir.isDirectory()) { + File objectFile = + new File( + regionDir, + convertPathString("object_table") + + File.separator + + convertPathString("1") + + File.separator + + convertPathString("5") + + File.separator + + convertPathString("3") + + File.separator + + convertPathString("file") + + File.separator + + "1.bin"); + if (objectFile.exists() && objectFile.isFile()) { + success = true; + } + } + } + } + } + } + Assert.assertTrue(success); + } + + @Override + protected String convertPathString(String path) { + return path; + } +} diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java index 49884e3c70ba..9c02ac94208a 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java @@ -36,7 +36,6 @@ import org.apache.iotdb.itbase.category.TableLocalStandaloneIT; import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.StatementExecutionException; -import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.session.TableSessionBuilder; import org.apache.tsfile.enums.ColumnCategory; @@ -45,7 +44,6 @@ import org.apache.tsfile.file.metadata.TableSchema; import org.apache.tsfile.read.common.RowRecord; import org.apache.tsfile.utils.Binary; -import org.apache.tsfile.utils.BytesUtils; import org.apache.tsfile.write.record.Tablet; import org.apache.tsfile.write.schema.IMeasurementSchema; import org.apache.tsfile.write.schema.MeasurementSchema; @@ -63,8 +61,6 @@ import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Paths; import java.time.LocalDate; import java.util.ArrayList; import java.util.Arrays; @@ -851,179 +847,6 @@ public void insertRelationalTabletWithCacheLeaderTest() } } - @Test - public void insertObjectTest() - throws IoTDBConnectionException, StatementExecutionException, IOException { - String testObject = - System.getProperty("user.dir") - + File.separator - + "target" - + File.separator - + "test-classes" - + File.separator - + "object-example.pt"; - File object = new File(testObject); - - try (ITableSession session = EnvFactory.getEnv().getTableSessionConnection()) { - session.executeNonQueryStatement("USE \"db1\""); - // insert table data by tablet - List columnNameList = - Arrays.asList("region_id", "plant_id", "device_id", "temperature", "file"); - List dataTypeList = - Arrays.asList( - TSDataType.STRING, - TSDataType.STRING, - TSDataType.STRING, - TSDataType.FLOAT, - TSDataType.OBJECT); - List columnTypeList = - new ArrayList<>( - Arrays.asList( - ColumnCategory.TAG, - ColumnCategory.TAG, - ColumnCategory.TAG, - ColumnCategory.FIELD, - ColumnCategory.FIELD)); - Tablet tablet = new Tablet("object_table", columnNameList, dataTypeList, columnTypeList, 1); - int rowIndex = tablet.getRowSize(); - tablet.addTimestamp(rowIndex, 1); - tablet.addValue(rowIndex, 0, "1"); - tablet.addValue(rowIndex, 1, "5"); - tablet.addValue(rowIndex, 2, "3"); - tablet.addValue(rowIndex, 3, 37.6F); - tablet.addValue(rowIndex, 4, true, 0, Files.readAllBytes(Paths.get(testObject))); - session.insert(tablet); - tablet.reset(); - - try (SessionDataSet dataSet = - session.executeQueryStatement("select file from object_table where time = 1")) { - SessionDataSet.DataIterator iterator = dataSet.iterator(); - while (iterator.next()) { - Assert.assertEquals( - BytesUtils.parseObjectByteArrayToString(BytesUtils.longToBytes(object.length())), - iterator.getString(1)); - } - } - - try (SessionDataSet dataSet = - session.executeQueryStatement( - "select READ_OBJECT(file) from object_table where time = 1")) { - SessionDataSet.DataIterator iterator = dataSet.iterator(); - while (iterator.next()) { - Binary binary = iterator.getBlob(1); - Assert.assertArrayEquals(Files.readAllBytes(Paths.get(testObject)), binary.getValues()); - } - } - } - } - - @Test - public void insertObjectSegmentsTest() - throws IoTDBConnectionException, StatementExecutionException, IOException { - String testObject = - System.getProperty("user.dir") - + File.separator - + "target" - + File.separator - + "test-classes" - + File.separator - + "object-example.pt"; - byte[] objectBytes = Files.readAllBytes(Paths.get(testObject)); - List objectSegments = new ArrayList<>(); - for (int i = 0; i < objectBytes.length; i += 512) { - objectSegments.add(Arrays.copyOfRange(objectBytes, i, Math.min(i + 512, objectBytes.length))); - } - - try (ITableSession session = EnvFactory.getEnv().getTableSessionConnection()) { - session.executeNonQueryStatement("USE \"db1\""); - // insert table data by tablet - List columnNameList = - Arrays.asList("region_id", "plant_id", "device_id", "temperature", "file"); - List dataTypeList = - Arrays.asList( - TSDataType.STRING, - TSDataType.STRING, - TSDataType.STRING, - TSDataType.FLOAT, - TSDataType.OBJECT); - List columnTypeList = - new ArrayList<>( - Arrays.asList( - ColumnCategory.TAG, - ColumnCategory.TAG, - ColumnCategory.TAG, - ColumnCategory.FIELD, - ColumnCategory.FIELD)); - Tablet tablet = new Tablet("object_table", columnNameList, dataTypeList, columnTypeList, 1); - for (int i = 0; i < objectSegments.size() - 1; i++) { - int rowIndex = tablet.getRowSize(); - tablet.addTimestamp(rowIndex, 1); - tablet.addValue(rowIndex, 0, "1"); - tablet.addValue(rowIndex, 1, "5"); - tablet.addValue(rowIndex, 2, "3"); - tablet.addValue(rowIndex, 3, 37.6F); - tablet.addValue(rowIndex, 4, false, i * 512L, objectSegments.get(i)); - session.insert(tablet); - tablet.reset(); - } - - try (SessionDataSet dataSet = - session.executeQueryStatement("select file from object_table where time = 1")) { - SessionDataSet.DataIterator iterator = dataSet.iterator(); - while (iterator.next()) { - assertNull(iterator.getString(1)); - } - } - - // insert segment with wrong offset - try { - int rowIndex = tablet.getRowSize(); - tablet.addTimestamp(rowIndex, 1); - tablet.addValue(rowIndex, 0, "1"); - tablet.addValue(rowIndex, 1, "5"); - tablet.addValue(rowIndex, 2, "3"); - tablet.addValue(rowIndex, 3, 37.6F); - tablet.addValue(rowIndex, 4, false, 512L, objectSegments.get(1)); - session.insert(tablet); - } catch (StatementExecutionException e) { - Assert.assertEquals(TSStatusCode.OBJECT_INSERT_ERROR.getStatusCode(), e.getStatusCode()); - Assert.assertEquals( - String.format( - "741: The file length %d is not equal to the offset %d", - ((objectSegments.size() - 1) * 512), 512L), - e.getMessage()); - } finally { - tablet.reset(); - } - - // last segment - int rowIndex = tablet.getRowSize(); - tablet.addTimestamp(rowIndex, 1); - tablet.addValue(rowIndex, 0, "1"); - tablet.addValue(rowIndex, 1, "5"); - tablet.addValue(rowIndex, 2, "3"); - tablet.addValue(rowIndex, 3, 37.6F); - tablet.addValue( - rowIndex, - 4, - true, - (objectSegments.size() - 1) * 512L, - objectSegments.get(objectSegments.size() - 1)); - session.insert(tablet); - tablet.reset(); - - try (SessionDataSet dataSet = - session.executeQueryStatement( - "select READ_OBJECT(file) from object_table where time = 1")) { - SessionDataSet.DataIterator iterator = dataSet.iterator(); - while (iterator.next()) { - Binary binary = iterator.getBlob(1); - Assert.assertArrayEquals(Files.readAllBytes(Paths.get(testObject)), binary.getValues()); - } - } - } - } - @Test public void autoCreateNontagColumnTest() throws IoTDBConnectionException, StatementExecutionException { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java index 88e8d76001dc..34b69382b8fd 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java @@ -316,6 +316,8 @@ public class ConfigNodeConfig { private long forceWalPeriodForConfigNodeSimpleInMs = 100; + private boolean restrictObjectLimit = false; + public ConfigNodeConfig() { // empty constructor } @@ -1275,4 +1277,12 @@ public long getFailureDetectorPhiAcceptablePauseInMs() { public void setFailureDetectorPhiAcceptablePauseInMs(long failureDetectorPhiAcceptablePauseInMs) { this.failureDetectorPhiAcceptablePauseInMs = failureDetectorPhiAcceptablePauseInMs; } + + public boolean getRestrictObjectLimit() { + return restrictObjectLimit; + } + + public void setRestrictObjectLimit(boolean restrictObjectLimit) { + this.restrictObjectLimit = restrictObjectLimit; + } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java index 0ea7a278732e..0d3968d3f7a2 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java @@ -368,6 +368,11 @@ private void loadProperties(TrimProperties properties) throws BadNodeUrlExceptio readConsistencyLevel)); } + conf.setRestrictObjectLimit( + Boolean.parseBoolean( + properties.getProperty( + "restrict_object_limit", String.valueOf(conf.getRestrictObjectLimit())))); + // commons commonDescriptor.loadCommonProps(properties); commonDescriptor.initCommonConfigDir(conf.getSystemDir()); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java index 529f15d06cd4..9cf6b2dac963 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java @@ -213,6 +213,16 @@ public static void checkSystemProperties() throws IOException { COMMON_CONFIG.setEnableGrantOption(enableGrantOption); } } + + if (systemProperties.getProperty("restrict_object_limit", null) != null) { + boolean restrictObjectLimit = + Boolean.parseBoolean(systemProperties.getProperty("restrict_object_limit")); + if (restrictObjectLimit != conf.getRestrictObjectLimit()) { + LOGGER.warn( + format, "restrict_object_limit", conf.getRestrictObjectLimit(), restrictObjectLimit); + conf.setRestrictObjectLimit(restrictObjectLimit); + } + } } /** @@ -286,6 +296,8 @@ public static void storeSystemParameters() throws IOException { "tag_attribute_total_size", String.valueOf(COMMON_CONFIG.getTagAttributeTotalSize())); systemProperties.setProperty( "enable_grant_option", String.valueOf(COMMON_CONFIG.getEnableGrantOption())); + systemProperties.setProperty( + "restrict_object_limit", String.valueOf(conf.getRestrictObjectLimit())); systemPropertiesHandler.overwrite(systemProperties); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java index e8083b79b68a..234227286df3 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java @@ -179,6 +179,7 @@ private void setGlobalConfig(ConfigurationResp dataSet) { globalConfig.setSchemaEngineMode(commonConfig.getSchemaEngineMode()); globalConfig.setTagAttributeTotalSize(commonConfig.getTagAttributeTotalSize()); globalConfig.setEnableGrantOption(commonConfig.getEnableGrantOption()); + globalConfig.setRestrictObjectLimit(configNodeConfig.getRestrictObjectLimit()); dataSet.setGlobalConfig(globalConfig); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 3f19a2101a80..6f861119d397 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -1206,8 +1206,11 @@ public class IoTDBConfig { private ConcurrentHashMap tsFileDBToEncryptMap = new ConcurrentHashMap<>( Collections.singletonMap("root.__audit", new EncryptParameter("UNENCRYPTED", null))); + private long maxObjectSizeInByte = 4 * 1024 * 1024 * 1024L; + private boolean restrictObjectLimit = false; + IoTDBConfig() {} public int getMaxLogEntriesNumPerBatch() { @@ -4335,4 +4338,12 @@ public long getMaxObjectSizeInByte() { public void setMaxObjectSizeInByte(long maxObjectSizeInByte) { this.maxObjectSizeInByte = maxObjectSizeInByte; } + + public boolean getRestrictObjectLimit() { + return restrictObjectLimit; + } + + public void setRestrictObjectLimit(boolean restrictObjectLimit) { + this.restrictObjectLimit = restrictObjectLimit; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index e51f445c5678..6a1e8dbcccbe 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -2746,6 +2746,7 @@ public void loadGlobalConfig(TGlobalConfig globalConfig) { conf.setSeriesPartitionExecutorClass(globalConfig.getSeriesPartitionExecutorClass()); conf.setSeriesPartitionSlotNum(globalConfig.getSeriesPartitionSlotNum()); conf.setReadConsistencyLevel(globalConfig.getReadConsistencyLevel()); + conf.setRestrictObjectLimit(globalConfig.isRestrictObjectLimit()); } public void loadRatisConfig(TRatisConfig ratisConfig) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java index 10fb9bc3443a..4ec37e46bf25 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java @@ -29,11 +29,11 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode; +import org.apache.iotdb.db.storageengine.dataregion.IObjectPath; import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryType; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue; -import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils; import org.apache.iotdb.db.storageengine.rescon.disk.TierManager; import org.apache.tsfile.utils.PublicBAOS; @@ -60,7 +60,7 @@ public class ObjectNode extends SearchNode implements WALEntryValue { private byte[] content; - private String filePath; + private IObjectPath filePath; private final int contentLength; @@ -68,7 +68,7 @@ public class ObjectNode extends SearchNode implements WALEntryValue { private boolean isGeneratedByRemoteConsensusLeader; - public ObjectNode(boolean isEOF, long offset, byte[] content, String filePath) { + public ObjectNode(boolean isEOF, long offset, byte[] content, IObjectPath filePath) { super(new PlanNodeId("")); this.isEOF = isEOF; this.offset = offset; @@ -77,7 +77,7 @@ public ObjectNode(boolean isEOF, long offset, byte[] content, String filePath) { this.contentLength = content.length; } - public ObjectNode(boolean isEOF, long offset, int contentLength, String filePath) { + public ObjectNode(boolean isEOF, long offset, int contentLength, IObjectPath filePath) { super(new PlanNodeId("")); this.isEOF = isEOF; this.offset = offset; @@ -97,12 +97,12 @@ public long getOffset() { return offset; } - public void setFilePath(String filePath) { + public void setFilePath(IObjectPath filePath) { this.filePath = filePath; } - public String getFilePath() { - return filePath; + public String getFilePathString() { + return filePath.toString(); } @Override @@ -111,7 +111,11 @@ public void serializeToWAL(IWALByteBufferView buffer) { buffer.putLong(searchIndex); buffer.put((byte) (isEOF ? 1 : 0)); buffer.putLong(offset); - WALWriteUtils.write(filePath, buffer); + try { + filePath.serialize(buffer); + } catch (IOException e) { + throw new RuntimeException(e); + } buffer.putInt(content.length); } @@ -122,14 +126,14 @@ public int serializedSize() { + Byte.BYTES + Long.BYTES + Integer.BYTES - + ReadWriteIOUtils.sizeToWrite(filePath); + + filePath.getSerializedSize(); } public static ObjectNode deserializeFromWAL(DataInputStream stream) throws IOException { long searchIndex = stream.readLong(); boolean isEOF = stream.readByte() == 1; long offset = stream.readLong(); - String filePath = ReadWriteIOUtils.readString(stream); + IObjectPath filePath = IObjectPath.getDeserializer().deserializeFrom(stream); int contentLength = stream.readInt(); ObjectNode objectNode = new ObjectNode(isEOF, offset, contentLength, filePath); objectNode.setSearchIndex(searchIndex); @@ -140,8 +144,9 @@ public static ObjectNode deserializeFromWAL(ByteBuffer buffer) { long searchIndex = buffer.getLong(); boolean isEOF = buffer.get() == 1; long offset = buffer.getLong(); - String filePath = ReadWriteIOUtils.readString(buffer); - Optional objectFile = TierManager.getInstance().getAbsoluteObjectFilePath(filePath); + IObjectPath filePath = IObjectPath.getDeserializer().deserializeFrom(buffer); + Optional objectFile = + TierManager.getInstance().getAbsoluteObjectFilePath(filePath.toString()); int contentLength = buffer.getInt(); byte[] contents = new byte[contentLength]; if (objectFile.isPresent()) { @@ -152,7 +157,7 @@ public static ObjectNode deserializeFromWAL(ByteBuffer buffer) { throw new RuntimeException(e); } } else { - throw new ObjectFileNotExist(filePath); + throw new ObjectFileNotExist(filePath.toString()); } ObjectNode objectNode = new ObjectNode(isEOF, offset, contents, filePath); @@ -163,7 +168,7 @@ public static ObjectNode deserializeFromWAL(ByteBuffer buffer) { public static ObjectNode deserialize(ByteBuffer byteBuffer) { boolean isEoF = ReadWriteIOUtils.readBool(byteBuffer); long offset = ReadWriteIOUtils.readLong(byteBuffer); - String filePath = ReadWriteIOUtils.readString(byteBuffer); + IObjectPath filePath = IObjectPath.getDeserializer().deserializeFrom(byteBuffer); int contentLength = ReadWriteIOUtils.readInt(byteBuffer); byte[] content = ReadWriteIOUtils.readBytes(byteBuffer, contentLength); return new ObjectNode(isEoF, offset, content, filePath); @@ -227,7 +232,7 @@ protected void serializeAttributes(ByteBuffer byteBuffer) { getType().serialize(byteBuffer); ReadWriteIOUtils.write(isEOF, byteBuffer); ReadWriteIOUtils.write(offset, byteBuffer); - ReadWriteIOUtils.write(filePath, byteBuffer); + filePath.serialize(byteBuffer); ReadWriteIOUtils.write(contentLength, byteBuffer); byteBuffer.put(content); } @@ -237,7 +242,7 @@ protected void serializeAttributes(DataOutputStream stream) throws IOException { getType().serialize(stream); ReadWriteIOUtils.write(isEOF, stream); ReadWriteIOUtils.write(offset, stream); - ReadWriteIOUtils.write(filePath, stream); + filePath.serialize(stream); ReadWriteIOUtils.write(contentLength, stream); stream.write(content); } @@ -251,7 +256,8 @@ public ByteBuffer serialize() { byte[] contents = new byte[contentLength]; boolean readSuccess = false; for (int i = 0; i < 2; i++) { - Optional objectFile = TierManager.getInstance().getAbsoluteObjectFilePath(filePath); + Optional objectFile = + TierManager.getInstance().getAbsoluteObjectFilePath(filePath.toString()); if (objectFile.isPresent()) { try { readContentFromFile(objectFile.get(), contents); @@ -279,7 +285,7 @@ public ByteBuffer serialize() { } ReadWriteIOUtils.write(readSuccess && isEOF, stream); ReadWriteIOUtils.write(offset, stream); - ReadWriteIOUtils.write(filePath, stream); + filePath.serialize(stream); ReadWriteIOUtils.write(contentLength, stream); stream.write(contents); return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java index 2297ddcebdd0..6eeb3d7322fe 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java @@ -27,24 +27,24 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode; -import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator; +import org.apache.iotdb.db.storageengine.dataregion.IObjectPath; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.file.metadata.IDeviceID.Factory; import org.apache.tsfile.utils.Binary; -import org.apache.tsfile.utils.BytesUtils; import org.apache.tsfile.utils.ReadWriteIOUtils; import java.io.DataInputStream; import java.io.IOException; import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.apache.iotdb.db.utils.ObjectTypeUtils.generateObjectBinary; + public class RelationalInsertRowsNode extends InsertRowsNode { // deviceId cache for Table-view insertion private IDeviceID[] deviceIDs; @@ -215,22 +215,22 @@ private void handleObjectValue( boolean isEoF = buffer.get() == 1; long offset = buffer.getLong(); byte[] content = ReadWriteIOUtils.readBytes(buffer, buffer.remaining()); - String relativePath = - TsFileNameGenerator.generateObjectFilePath( + IObjectPath relativePath = + IObjectPath.Factory.FACTORY.create( dataRegionReplicaSet.getRegionId().getId(), insertRowNode.getTime(), insertRowNode.getDeviceID(), insertRowNode.getMeasurements()[j]); ObjectNode objectNode = new ObjectNode(isEoF, offset, content, relativePath); objectNode.setDataRegionReplicaSet(dataRegionReplicaSet); - byte[] filePathBytes = relativePath.getBytes(StandardCharsets.UTF_8); - byte[] valueBytes = new byte[filePathBytes.length + Long.BYTES]; - System.arraycopy( - BytesUtils.longToBytes(offset + content.length), 0, valueBytes, 0, Long.BYTES); - System.arraycopy(filePathBytes, 0, valueBytes, Long.BYTES, filePathBytes.length); - ((Binary) values[j]).setValues(valueBytes); - insertRowNode.setValues(values); writePlanNodeList.add(objectNode); + if (isEoF) { + ((Binary) values[j]) + .setValues(generateObjectBinary(offset + content.length, relativePath).getValues()); + insertRowNode.setValues(values); + } else { + values[j] = null; + } } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java index e3a114211e16..3255c11f1e96 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java @@ -32,7 +32,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode; import org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceSchemaCache; -import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator; +import org.apache.iotdb.db.storageengine.dataregion.IObjectPath; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView; import org.apache.tsfile.enums.TSDataType; @@ -41,7 +41,6 @@ import org.apache.tsfile.read.TimeValuePair; import org.apache.tsfile.utils.Binary; import org.apache.tsfile.utils.BitMap; -import org.apache.tsfile.utils.BytesUtils; import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.ReadWriteIOUtils; import org.apache.tsfile.write.schema.MeasurementSchema; @@ -50,13 +49,14 @@ import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import static org.apache.iotdb.db.utils.ObjectTypeUtils.generateObjectBinary; + public class RelationalInsertTabletNode extends InsertTabletNode { // deviceId cache for Table-view insertion @@ -465,24 +465,23 @@ private void handleObjectValue( Map.Entry> entry, List result) { for (int j = startRow; j < endRow; j++) { + if (((Binary[]) columns[column])[j] == null) { + continue; + } byte[] binary = ((Binary[]) columns[column])[j].getValues(); ByteBuffer buffer = ByteBuffer.wrap(binary); boolean isEoF = buffer.get() == 1; long offset = buffer.getLong(); byte[] content = ReadWriteIOUtils.readBytes(buffer, buffer.remaining()); - String relativePath = - TsFileNameGenerator.generateObjectFilePath( + IObjectPath relativePath = + IObjectPath.Factory.FACTORY.create( entry.getKey().getRegionId().getId(), times[j], getDeviceID(j), measurements[column]); ObjectNode objectNode = new ObjectNode(isEoF, offset, content, relativePath); objectNode.setDataRegionReplicaSet(entry.getKey()); result.add(objectNode); if (isEoF) { - byte[] filePathBytes = relativePath.getBytes(StandardCharsets.UTF_8); - byte[] valueBytes = new byte[filePathBytes.length + Long.BYTES]; - System.arraycopy( - BytesUtils.longToBytes(offset + content.length), 0, valueBytes, 0, Long.BYTES); - System.arraycopy(filePathBytes, 0, valueBytes, Long.BYTES, filePathBytes.length); - ((Binary[]) columns[column])[j] = new Binary(valueBytes); + ((Binary[]) columns[column])[j] = + generateObjectBinary(offset + content.length, relativePath); } else { ((Binary[]) columns[column])[j] = null; if (bitMaps == null) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ReadObjectColumnTransformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ReadObjectColumnTransformer.java index 3049a9bc4415..a4ad4e25756a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ReadObjectColumnTransformer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ReadObjectColumnTransformer.java @@ -100,7 +100,8 @@ private void transform(Column column, ColumnBuilder columnBuilder, int i) { } private Binary readObject(Binary binary) { - Pair objectLengthPathPair = ObjectTypeUtils.parseObjectBinary(binary); + Pair objectLengthPathPair = + ObjectTypeUtils.parseObjectBinaryToSizeStringPathPair(binary); long fileLength = objectLengthPathPair.getLeft(); String relativePath = objectLengthPathPair.getRight(); int actualReadSize = diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/Base32ObjectPath.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/Base32ObjectPath.java new file mode 100644 index 000000000000..d0ea395502a9 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/Base32ObjectPath.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion; + +import com.google.common.io.BaseEncoding; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.utils.ReadWriteForEncodingUtils; +import org.apache.tsfile.utils.ReadWriteIOUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.nio.file.Path; +import java.nio.file.Paths; + +public class Base32ObjectPath implements IObjectPath { + + private final Path path; + private int serializedSize = -1; + + private static final Deserializer DESERIALIZER = + new Deserializer() { + @Override + public IObjectPath deserializeFrom(ByteBuffer byteBuffer) { + return deserialize(byteBuffer); + } + + @Override + public IObjectPath deserializeFrom(InputStream inputStream) throws IOException { + return deserialize(inputStream); + } + + @Override + public IObjectPath deserializeFromObjectValue(ByteBuffer byteBuffer) { + return deserialize(byteBuffer); + } + }; + + private static final Factory FACTORY = Base32ObjectPath::new; + + private Base32ObjectPath(String first, String... more) { + path = Paths.get(first, more); + } + + public Base32ObjectPath(Path path) { + this.path = path; + } + + public Base32ObjectPath(int regionId, long time, IDeviceID iDeviceID, String measurement) { + Object[] segments = iDeviceID.getSegments(); + String[] pathSegments = new String[segments.length + 2]; + for (int i = 0; i < segments.length; i++) { + Object segment = segments[i]; + String segmentString = segment == null ? "null" : segment.toString(); + pathSegments[i] = + BaseEncoding.base32() + .omitPadding() + .encode(segmentString.getBytes(StandardCharsets.UTF_8)); + } + pathSegments[pathSegments.length - 2] = + BaseEncoding.base32().omitPadding().encode(measurement.getBytes(StandardCharsets.UTF_8)); + pathSegments[pathSegments.length - 1] = time + ".bin"; + path = Paths.get(String.valueOf(regionId), pathSegments); + } + + @Override + public int serialize(ByteBuffer byteBuffer) { + int cnt = 0; + cnt += ReadWriteForEncodingUtils.writeUnsignedVarInt(path.getNameCount(), byteBuffer); + for (Path segment : path) { + cnt += ReadWriteIOUtils.writeVar(segment.toString(), byteBuffer); + } + return cnt; + } + + @Override + public int serialize(OutputStream outputStream) throws IOException { + int cnt = 0; + cnt += ReadWriteForEncodingUtils.writeUnsignedVarInt(path.getNameCount(), outputStream); + for (Path segment : path) { + cnt += ReadWriteIOUtils.writeVar(segment.toString(), outputStream); + } + return cnt; + } + + @Override + public int getSerializedSize() { + if (serializedSize != -1) { + return serializedSize; + } + int cnt = ReadWriteForEncodingUtils.varIntSize(path.getNameCount()); + for (Path segment : path) { + byte[] bytes = segment.toString().getBytes(StandardCharsets.UTF_8); + cnt += ReadWriteForEncodingUtils.varIntSize(bytes.length); + cnt += bytes.length; + } + serializedSize = cnt; + return cnt; + } + + @Override + public void serializeToObjectValue(ByteBuffer byteBuffer) { + serialize(byteBuffer); + } + + @Override + public int getSerializeSizeToObjectValue() { + return getSerializedSize(); + } + + public static Base32ObjectPath deserialize(ByteBuffer byteBuffer) { + int cnt = ReadWriteForEncodingUtils.readUnsignedVarInt(byteBuffer); + String first = ReadWriteIOUtils.readVarIntString(byteBuffer); + String[] more = new String[cnt - 1]; + + for (int i = 0; i < cnt - 1; ++i) { + more[i] = ReadWriteIOUtils.readVarIntString(byteBuffer); + } + return new Base32ObjectPath(first, more); + } + + public static Base32ObjectPath deserialize(InputStream stream) throws IOException { + int cnt = ReadWriteForEncodingUtils.readUnsignedVarInt(stream); + String first = ReadWriteIOUtils.readVarIntString(stream); + String[] more = new String[cnt - 1]; + + for (int i = 0; i < cnt - 1; ++i) { + more[i] = ReadWriteIOUtils.readVarIntString(stream); + } + + return new Base32ObjectPath(first, more); + } + + @Override + public String toString() { + return path.toString(); + } + + public Path getPath() { + return path; + } + + public static Factory getFACTORY() { + return FACTORY; + } + + public static Deserializer getDESERIALIZER() { + return DESERIALIZER; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index 4db15d48bc67..09c9ee843041 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -3584,7 +3584,7 @@ public int compact() { public void writeObject(ObjectNode objectNode) throws Exception { writeLock("writeObject"); try { - String relativeTmpPathString = objectNode.getFilePath() + ".tmp"; + String relativeTmpPathString = objectNode.getFilePathString() + ".tmp"; String objectFileDir = TierManager.getInstance().getNextFolderForObjectFile(); File objectTmpFile = FSFactoryProducer.getFSFactory().getFile(objectFileDir, relativeTmpPathString); @@ -3596,9 +3596,9 @@ public void writeObject(ObjectNode objectNode) throws Exception { } if (objectNode.isEOF()) { File objectFile = - FSFactoryProducer.getFSFactory().getFile(objectFileDir, objectNode.getFilePath()); + FSFactoryProducer.getFSFactory().getFile(objectFileDir, objectNode.getFilePathString()); if (objectFile.exists()) { - String relativeBackPathString = objectNode.getFilePath() + ".back"; + String relativeBackPathString = objectNode.getFilePathString() + ".back"; File objectBackFile = FSFactoryProducer.getFSFactory().getFile(objectFileDir, relativeBackPathString); Files.move( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/IObjectPath.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/IObjectPath.java new file mode 100644 index 000000000000..c340aae440c0 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/IObjectPath.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion; + +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; + +import org.apache.tsfile.file.metadata.IDeviceID; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +public interface IObjectPath { + + IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig(); + + int serialize(ByteBuffer byteBuffer); + + int serialize(OutputStream outputStream) throws IOException; + + int getSerializedSize(); + + void serializeToObjectValue(ByteBuffer byteBuffer); + + int getSerializeSizeToObjectValue(); + + interface Factory { + + IObjectPath create(int regionId, long time, IDeviceID iDeviceID, String measurement); + + Factory FACTORY = + CONFIG.getRestrictObjectLimit() + ? PlainObjectPath.getFACTORY() + : Base32ObjectPath.getFACTORY(); + } + + interface Deserializer { + + IObjectPath deserializeFrom(ByteBuffer byteBuffer); + + IObjectPath deserializeFrom(InputStream inputStream) throws IOException; + + IObjectPath deserializeFromObjectValue(ByteBuffer byteBuffer); + } + + static Deserializer getDeserializer() { + return CONFIG.getRestrictObjectLimit() + ? PlainObjectPath.getDESERIALIZER() + : Base32ObjectPath.getDESERIALIZER(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/PlainObjectPath.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/PlainObjectPath.java new file mode 100644 index 000000000000..3ae3a925a3c9 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/PlainObjectPath.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion; + +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.utils.ReadWriteIOUtils; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; + +public class PlainObjectPath implements IObjectPath { + + private final String filePath; + + private static final Deserializer DESERIALIZER = + new Deserializer() { + @Override + public IObjectPath deserializeFrom(ByteBuffer byteBuffer) { + return deserialize(byteBuffer); + } + + @Override + public IObjectPath deserializeFrom(InputStream inputStream) throws IOException { + return deserialize(inputStream); + } + + @Override + public IObjectPath deserializeFromObjectValue(ByteBuffer byteBuffer) { + return deserializeObjectValue(byteBuffer); + } + }; + + private static final Factory FACTORY = PlainObjectPath::new; + + public PlainObjectPath(String filePath) { + this.filePath = filePath; + } + + public PlainObjectPath(int regionId, long time, IDeviceID iDeviceID, String measurement) { + String objectFileName = time + ".bin"; + Object[] segments = iDeviceID.getSegments(); + StringBuilder relativePathString = + new StringBuilder(String.valueOf(regionId)).append(File.separator); + for (Object segment : segments) { + relativePathString + .append(segment == null ? "null" : segment.toString().toLowerCase()) + .append(File.separator); + } + relativePathString.append(measurement).append(File.separator); + relativePathString.append(objectFileName); + this.filePath = relativePathString.toString(); + } + + @Override + public int serialize(ByteBuffer byteBuffer) { + return ReadWriteIOUtils.write(filePath, byteBuffer); + } + + @Override + public int serialize(OutputStream outputStream) throws IOException { + return ReadWriteIOUtils.write(filePath, outputStream); + } + + @Override + public int getSerializedSize() { + return ReadWriteIOUtils.sizeToWrite(filePath); + } + + @Override + public void serializeToObjectValue(ByteBuffer byteBuffer) { + byteBuffer.put(filePath.getBytes(StandardCharsets.UTF_8)); + } + + @Override + public int getSerializeSizeToObjectValue() { + return filePath.getBytes(StandardCharsets.UTF_8).length; + } + + public static PlainObjectPath deserialize(ByteBuffer byteBuffer) { + String filePath = ReadWriteIOUtils.readString(byteBuffer); + return new PlainObjectPath(filePath); + } + + public static PlainObjectPath deserialize(InputStream stream) throws IOException { + String filePath = ReadWriteIOUtils.readString(stream); + return new PlainObjectPath(filePath); + } + + public static PlainObjectPath deserializeObjectValue(ByteBuffer byteBuffer) { + return new PlainObjectPath(StandardCharsets.UTF_8.decode(byteBuffer).toString()); + } + + @Override + public String toString() { + return filePath; + } + + public static Factory getFACTORY() { + return FACTORY; + } + + public static Deserializer getDESERIALIZER() { + return DESERIALIZER; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java index 0262eb361700..bac487291671 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java @@ -533,7 +533,8 @@ public static void removeDeletedObjectFiles( TsFileSequenceReader reader, List alignedChunkMetadataList, List timeMods, - List> valueMods) + List> valueMods, + int currentRegionId) throws IOException { if (alignedChunkMetadataList.isEmpty()) { return; @@ -578,7 +579,8 @@ public static void removeDeletedObjectFiles( objectColumnIndexList, timeDeletionIntervalList, objectDeletionIntervalList, - deletionCursors); + deletionCursors, + currentRegionId); } } @@ -589,7 +591,8 @@ private static void removeDeletedObjectFiles( List objectColumnIndexList, List timeDeletions, List> objectDeletions, - int[] deletionCursors) + int[] deletionCursors, + int currentRegionId) throws IOException { Chunk timeChunk = reader.readMemChunk((ChunkMetadata) alignedChunkMetadata.getTimeChunkMetadata()); @@ -612,6 +615,12 @@ private static void removeDeletedObjectFiles( continue; } Chunk chunk = reader.readMemChunk(valueChunkMetadata); + if (chunk != null) { + chunk + .getHeader() + .setReplaceDecoder( + decoder -> ObjectTypeUtils.getReplaceDecoder(decoder, currentRegionId)); + } valueChunks.add(chunk); valuePages.add( chunk == null diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java index 7b99541c006e..c07f5a1ad404 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java @@ -489,7 +489,8 @@ private void applyModificationForAlignedChunkMetadataList( readerMap.get(tsFileResource), alignedChunkMetadataList, Collections.singletonList(ttlDeletion), - modificationForValueColumns.stream().map(v -> emptyList).collect(Collectors.toList())); + modificationForValueColumns.stream().map(v -> emptyList).collect(Collectors.toList()), + tsFileResource.getTsFileID().regionId); } ModificationUtils.modifyAlignedChunkMetaData( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastAlignedSeriesCompactionExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastAlignedSeriesCompactionExecutor.java index 166417a97eaf..7a8e884a4913 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastAlignedSeriesCompactionExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastAlignedSeriesCompactionExecutor.java @@ -284,7 +284,8 @@ protected List getAlignedChunkMetadataList(TsFileR readerCacheMap.get(resource), alignedChunkMetadataList, Collections.singletonList(ttlDeletion), - valueModifications.stream().map(v -> emptyList).collect(Collectors.toList())); + valueModifications.stream().map(v -> emptyList).collect(Collectors.toList()), + resource.getTsFileID().regionId); } // modify aligned chunk metadatas diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/reader/CompactionAlignedChunkReader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/reader/CompactionAlignedChunkReader.java index a94150deca19..3cbc0c2fbae1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/reader/CompactionAlignedChunkReader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/reader/CompactionAlignedChunkReader.java @@ -92,7 +92,7 @@ public IPointReader getPagePointReader( ByteBuffer compressedTimePageData, List compressedValuePageDatas) throws IOException { - return getPontReader( + return getPointReader( timePageHeader, valuePageHeaders, compressedTimePageData, @@ -106,11 +106,11 @@ public IPointReader getBatchedPagePointReader( ByteBuffer compressedTimePageData, List compressedValuePageDatas) throws IOException { - return getPontReader( + return getPointReader( timePageHeader, valuePageHeaders, compressedTimePageData, compressedValuePageDatas, false); } - private IPointReader getPontReader( + private IPointReader getPointReader( PageHeader timePageHeader, List valuePageHeaders, ByteBuffer compressedTimePageData, @@ -146,7 +146,7 @@ private IPointReader getPontReader( valuePageHeaders.get(i), uncompressedPageData, valueType, - Decoder.getDecoderByType(valueChunkHeader.getEncodingType(), valueType)); + valueChunkHeader.calculateDecoderForNonTimeChunk()); valuePageReader.setDeleteIntervalList(valueDeleteIntervalList.get(i)); valuePageReaders.add(valuePageReader); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/DiskAlignedChunkLoader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/DiskAlignedChunkLoader.java index a7c6eb96d429..27883b34e9ce 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/DiskAlignedChunkLoader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/DiskAlignedChunkLoader.java @@ -24,7 +24,9 @@ import org.apache.iotdb.db.storageengine.buffer.ChunkCache; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.db.utils.ObjectTypeUtils; +import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.file.metadata.AbstractAlignedChunkMetadata; import org.apache.tsfile.file.metadata.ChunkMetadata; import org.apache.tsfile.file.metadata.IChunkMetadata; @@ -92,7 +94,7 @@ public IChunkReader getChunkReader(IChunkMetadata chunkMetaData, Filter globalTi context); List valueChunkList = new ArrayList<>(); for (IChunkMetadata valueChunkMetadata : alignedChunkMetadata.getValueChunkMetadataList()) { - valueChunkList.add( + Chunk chunk = valueChunkMetadata == null ? null : ChunkCache.getInstance() @@ -104,7 +106,17 @@ public IChunkReader getChunkReader(IChunkMetadata chunkMetaData, Filter globalTi resource.isClosed()), valueChunkMetadata.getDeleteIntervalList(), valueChunkMetadata.getStatistics(), - context)); + context); + final TsFileID tsFileID = getTsFileID(); + if (chunk != null + && tsFileID.regionId > 0 + && chunkMetaData.getDataType() == TSDataType.OBJECT) { + chunk + .getHeader() + .setReplaceDecoder( + decoder -> ObjectTypeUtils.getReplaceDecoder(decoder, tsFileID.regionId)); + } + valueChunkList.add(chunk); } long t2 = System.nanoTime(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/DiskChunkLoader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/DiskChunkLoader.java index be33428ae65f..a2ff24233d9f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/DiskChunkLoader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/DiskChunkLoader.java @@ -24,7 +24,9 @@ import org.apache.iotdb.db.storageengine.buffer.ChunkCache; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.db.utils.ObjectTypeUtils; +import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.file.metadata.ChunkMetadata; import org.apache.tsfile.file.metadata.IChunkMetadata; import org.apache.tsfile.read.common.Chunk; @@ -85,6 +87,14 @@ public IChunkReader getChunkReader(IChunkMetadata chunkMetaData, Filter globalTi chunkMetaData.getStatistics(), context); + final TsFileID tsFileID = getTsFileID(); + if (tsFileID.regionId > 0 && chunkMetaData.getDataType() == TSDataType.OBJECT) { + chunk + .getHeader() + .setReplaceDecoder( + decoder -> ObjectTypeUtils.getReplaceDecoder(decoder, tsFileID.regionId)); + } + long t2 = System.nanoTime(); IChunkReader chunkReader = new ChunkReader(chunk, globalTimeFilter); SeriesScanCostMetricSet.getInstance() diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/generator/TsFileNameGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/generator/TsFileNameGenerator.java index 8ad353a96ef6..16be82188e9c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/generator/TsFileNameGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/generator/TsFileNameGenerator.java @@ -28,7 +28,6 @@ import org.apache.iotdb.db.storageengine.rescon.disk.TierManager; import org.apache.tsfile.common.constant.TsFileConstant; -import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.fileSystem.FSFactoryProducer; import org.apache.tsfile.fileSystem.fsFactory.FSFactory; import org.slf4j.Logger; @@ -176,22 +175,6 @@ public static TsFileName getTsFileName(String fileName) throws IOException { } } - public static String generateObjectFilePath( - int regionId, long time, IDeviceID iDeviceID, String measurement) { - String objectFileName = time + ".bin"; - Object[] segments = iDeviceID.getSegments(); - StringBuilder relativePathString = - new StringBuilder(String.valueOf(regionId)).append(File.separator); - for (Object segment : segments) { - relativePathString - .append(segment == null ? "null" : segment.toString().toLowerCase()) - .append(File.separator); - } - relativePathString.append(measurement).append(File.separator); - relativePathString.append(objectFileName); - return relativePathString.toString(); - } - @TestOnly public static TsFileResource increaseCrossCompactionCnt(TsFileResource tsFileResource) throws IOException { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/TierManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/TierManager.java index 6355880cebd2..a5fa8b54e7b1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/TierManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/TierManager.java @@ -19,7 +19,6 @@ package org.apache.iotdb.db.storageengine.rescon.disk; import org.apache.iotdb.commons.conf.IoTDBConstant; -import org.apache.iotdb.commons.exception.ObjectFileNotExist; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; @@ -345,16 +344,6 @@ private long[] getTierDiskSpace(DiskSpaceType type) { return tierDiskSpace; } - public File getObjectFile(String relativePath) { - for (String folder : objectDirs) { - File file = new File(folder, relativePath); - if (file.exists()) { - return file; - } - } - throw new ObjectFileNotExist(relativePath); - } - private enum DiskSpaceType { TOTAL, USABLE, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java index 7dd142b77ce6..aebcaf9a4dd3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java @@ -30,13 +30,19 @@ import org.apache.iotdb.db.queryengine.plan.Coordinator; import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher; import org.apache.iotdb.db.service.metrics.FileMetrics; +import org.apache.iotdb.db.storageengine.dataregion.Base32ObjectPath; +import org.apache.iotdb.db.storageengine.dataregion.IObjectPath; +import org.apache.iotdb.db.storageengine.dataregion.PlainObjectPath; import org.apache.iotdb.db.storageengine.rescon.disk.TierManager; import org.apache.iotdb.mpp.rpc.thrift.TReadObjectReq; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.tsfile.common.conf.TSFileConfig; +import org.apache.tsfile.encoding.decoder.Decoder; +import org.apache.tsfile.encoding.decoder.DecoderWrapper; import org.apache.tsfile.utils.Binary; import org.apache.tsfile.utils.Pair; +import org.apache.tsfile.utils.ReadWriteIOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,6 +51,7 @@ import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.file.Files; +import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; import java.util.Collections; @@ -60,7 +67,8 @@ private ObjectTypeUtils() {} public static ByteBuffer readObjectContent( Binary binary, long offset, int length, boolean mayNotInCurrentNode) { - Pair objectLengthPathPair = ObjectTypeUtils.parseObjectBinary(binary); + Pair objectLengthPathPair = + ObjectTypeUtils.parseObjectBinaryToSizeStringPathPair(binary); long fileLength = objectLengthPathPair.getLeft(); String relativePath = objectLengthPathPair.getRight(); int actualReadSize = @@ -147,6 +155,69 @@ private static ByteBuffer readObjectContentFromRemoteFile( return buffer; } + public static Binary generateObjectBinary(long objectSize, IObjectPath objectPath) { + byte[] valueBytes = new byte[objectPath.getSerializeSizeToObjectValue() + Long.BYTES]; + ByteBuffer buffer = ByteBuffer.wrap(valueBytes); + ReadWriteIOUtils.write(objectSize, buffer); + objectPath.serializeToObjectValue(buffer); + return new Binary(buffer.array()); + } + + public static DecoderWrapper getReplaceDecoder(final Decoder decoder, final int newRegionId) { + return new ObjectRegionIdReplaceDecoder(decoder, newRegionId); + } + + private static class ObjectRegionIdReplaceDecoder extends DecoderWrapper { + + private final int newRegionId; + + public ObjectRegionIdReplaceDecoder(Decoder decoder, int newRegionId) { + super(decoder); + this.newRegionId = newRegionId; + } + + @Override + public Binary readBinary(ByteBuffer buffer) { + Binary originValue = originDecoder.readBinary(buffer); + return ObjectTypeUtils.replaceRegionIdForObjectBinary(newRegionId, originValue); + } + } + + public static Binary replaceRegionIdForObjectBinary(int newRegionId, Binary originValue) { + Pair pair = + ObjectTypeUtils.parseObjectBinaryToSizeIObjectPathPair(originValue); + IObjectPath objectPath = pair.getRight(); + try { + Path path; + if (objectPath instanceof PlainObjectPath) { + path = Paths.get(objectPath.toString()); + } else { + path = ((Base32ObjectPath) objectPath).getPath(); + } + int regionId = Integer.parseInt(path.getName(0).toString()); + if (regionId == newRegionId) { + return originValue; + } + IObjectPath newObjectPath; + if (objectPath instanceof PlainObjectPath) { + String newPath = objectPath.toString().replaceFirst(regionId + "", newRegionId + ""); + newObjectPath = new PlainObjectPath(newPath); + } else { + String[] subPath = new String[path.getNameCount() - 1]; + for (int i = 1; i < path.getNameCount(); i++) { + subPath[i - 1] = path.getName(i).toString(); + } + Path newPath = Paths.get(newRegionId + "", subPath); + newObjectPath = new Base32ObjectPath(newPath); + } + return ObjectTypeUtils.generateObjectBinary(pair.getLeft(), newObjectPath); + } catch (NumberFormatException e) { + throw new IoTDBRuntimeException( + "wrong object file path: " + pair.getRight(), + TSStatusCode.OBJECT_READ_ERROR.getStatusCode()); + } + } + public static int getActualReadSize(String filePath, long fileSize, long offset, long length) { if (offset >= fileSize) { throw new SemanticException( @@ -164,15 +235,23 @@ public static int getActualReadSize(String filePath, long fileSize, long offset, return (int) actualReadSize; } - public static Pair parseObjectBinary(Binary binary) { + public static Pair parseObjectBinaryToSizeStringPathPair(Binary binary) { byte[] bytes = binary.getValues(); ByteBuffer buffer = ByteBuffer.wrap(bytes); long length = buffer.getLong(); String relativeObjectFilePath = - new String(bytes, 8, bytes.length - 8, TSFileConfig.STRING_CHARSET); + IObjectPath.getDeserializer().deserializeFromObjectValue(buffer).toString(); return new Pair<>(length, relativeObjectFilePath); } + public static Pair parseObjectBinaryToSizeIObjectPathPair(Binary binary) { + byte[] bytes = binary.getValues(); + ByteBuffer buffer = ByteBuffer.wrap(bytes); + long length = buffer.getLong(); + IObjectPath objectPath = IObjectPath.getDeserializer().deserializeFromObjectValue(buffer); + return new Pair<>(length, objectPath); + } + public static long getObjectLength(Binary binary) { byte[] bytes = binary.getValues(); ByteBuffer wrap = ByteBuffer.wrap(bytes); @@ -189,8 +268,9 @@ public static Optional getObjectPathFromBinary(Binary binary) { public static Optional getNullableObjectPathFromBinary( Binary binary, boolean needTempFile) { byte[] bytes = binary.getValues(); + ByteBuffer buffer = ByteBuffer.wrap(bytes, 8, bytes.length - 8); String relativeObjectFilePath = - new String(bytes, 8, bytes.length - 8, TSFileConfig.STRING_CHARSET); + IObjectPath.getDeserializer().deserializeFromObjectValue(buffer).toString(); return TIER_MANAGER.getAbsoluteObjectFilePath(relativeObjectFilePath, needTempFile); } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/RecordObjectTypeTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/RecordObjectTypeTest.java index c5e745aea87e..98f2428e8d58 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/RecordObjectTypeTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/RecordObjectTypeTest.java @@ -19,6 +19,8 @@ package org.apache.iotdb.db.queryengine.plan.function; +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; import org.apache.iotdb.db.queryengine.execution.operator.process.function.partition.Slice; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.RecordIterator; @@ -54,10 +56,13 @@ public class RecordObjectTypeTest { + private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); + private File objectDir; @Before public void setup() { + config.setRestrictObjectLimit(true); try { objectDir = new File(TierManager.getInstance().getNextFolderForObjectFile()); } catch (DiskSpaceInsufficientException e) { @@ -73,6 +78,7 @@ public void tearDown() throws IOException { Files.delete(file.toPath()); } } + config.setRestrictObjectLimit(false); } @Test diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ObjectTypeFunctionTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ObjectTypeFunctionTest.java index 0a0d73d2ac31..022257d1ee63 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ObjectTypeFunctionTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ObjectTypeFunctionTest.java @@ -19,6 +19,8 @@ package org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar; +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; import org.apache.iotdb.db.queryengine.transformation.dag.column.ColumnTransformer; import org.apache.iotdb.db.storageengine.rescon.disk.TierManager; @@ -48,10 +50,13 @@ public class ObjectTypeFunctionTest { + private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); + private File objectDir; @Before public void setup() { + config.setRestrictObjectLimit(true); try { objectDir = new File(TierManager.getInstance().getNextFolderForObjectFile()); } catch (DiskSpaceInsufficientException e) { @@ -67,6 +72,7 @@ public void tearDown() throws IOException { Files.delete(file.toPath()); } } + config.setRestrictObjectLimit(false); } @Test diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/object/ObjectTypeCompactionTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/object/ObjectTypeCompactionTest.java index 4bc11fbec572..4fd7e8c432d9 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/object/ObjectTypeCompactionTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/object/ObjectTypeCompactionTest.java @@ -23,9 +23,15 @@ import org.apache.iotdb.commons.schema.table.TsTable; import org.apache.iotdb.commons.schema.table.column.FieldColumnSchema; import org.apache.iotdb.commons.schema.table.column.TagColumnSchema; +import org.apache.iotdb.commons.utils.FileUtils; +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache; +import org.apache.iotdb.db.storageengine.dataregion.Base32ObjectPath; +import org.apache.iotdb.db.storageengine.dataregion.IObjectPath; +import org.apache.iotdb.db.storageengine.dataregion.PlainObjectPath; import org.apache.iotdb.db.storageengine.dataregion.compaction.AbstractCompactionTest; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadChunkCompactionPerformer; @@ -36,6 +42,7 @@ import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; import org.apache.iotdb.db.storageengine.rescon.disk.TierManager; +import org.apache.iotdb.db.utils.ObjectTypeUtils; import org.apache.tsfile.enums.ColumnCategory; import org.apache.tsfile.enums.TSDataType; @@ -76,11 +83,15 @@ public class ObjectTypeCompactionTest extends AbstractCompactionTest { private String threadName; private File objectDir; + private File regionDir; + + private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); @Before @Override public void setUp() throws IOException, WriteProcessException, MetadataException, InterruptedException { + config.setRestrictObjectLimit(true); this.threadName = Thread.currentThread().getName(); Thread.currentThread().setName("pool-1-IoTDB-Compaction-Worker-1"); DataNodeTableCache.getInstance().invalid(this.COMPACTION_TEST_SG); @@ -88,6 +99,8 @@ public void setUp() super.setUp(); try { objectDir = new File(TierManager.getInstance().getNextFolderForObjectFile()); + regionDir = new File(objectDir, "0"); + regionDir.mkdirs(); } catch (DiskSpaceInsufficientException e) { throw new RuntimeException(e); } @@ -102,9 +115,10 @@ public void tearDown() throws IOException, StorageEngineException { File[] files = objectDir.listFiles(); if (files != null) { for (File file : files) { - Files.delete(file.toPath()); + FileUtils.deleteFileOrDirectory(file); } } + config.setRestrictObjectLimit(false); } public void createTable(String tableName, long ttl) { @@ -120,9 +134,9 @@ public void createTable(String tableName, long ttl) { @Test public void testSeqCompactionWithTTL() throws IOException, WriteProcessException { Pair pair1 = - generateTsFileAndObject(true, System.currentTimeMillis() - 10000); + generateTsFileAndObject(true, System.currentTimeMillis() - 10000, 0); Pair pair2 = - generateTsFileAndObject(true, System.currentTimeMillis() + 1000000); + generateTsFileAndObject(true, System.currentTimeMillis() + 1000000, 100); tsFileManager.add(pair1.getLeft(), true); tsFileManager.add(pair2.getLeft(), true); InnerSpaceCompactionTask task = @@ -141,9 +155,9 @@ public void testSeqCompactionWithTTL() throws IOException, WriteProcessException @Test public void testUnseqCompactionWithTTL() throws IOException, WriteProcessException { Pair pair1 = - generateTsFileAndObject(false, System.currentTimeMillis() + 100000); + generateTsFileAndObject(false, System.currentTimeMillis() + 100000, 1); Pair pair2 = - generateTsFileAndObject(false, System.currentTimeMillis() - 1000000); + generateTsFileAndObject(false, System.currentTimeMillis() - 1000000, 0); tsFileManager.add(pair1.getLeft(), false); tsFileManager.add(pair2.getLeft(), false); InnerSpaceCompactionTask task = @@ -162,9 +176,9 @@ public void testUnseqCompactionWithTTL() throws IOException, WriteProcessExcepti @Test public void testUnseqCompactionWithReadPointWithTTL() throws IOException, WriteProcessException { Pair pair1 = - generateTsFileAndObject(false, System.currentTimeMillis() + 100000); + generateTsFileAndObject(false, System.currentTimeMillis() + 100000, 0); Pair pair2 = - generateTsFileAndObject(false, System.currentTimeMillis() - 1000000); + generateTsFileAndObject(false, System.currentTimeMillis() - 1000000, 0); tsFileManager.add(pair1.getLeft(), false); tsFileManager.add(pair2.getLeft(), false); InnerSpaceCompactionTask task = @@ -183,9 +197,9 @@ public void testUnseqCompactionWithReadPointWithTTL() throws IOException, WriteP @Test public void testCrossCompactionWithTTL() throws IOException, WriteProcessException { Pair pair1 = - generateTsFileAndObject(true, System.currentTimeMillis() + 100000); + generateTsFileAndObject(true, System.currentTimeMillis() + 100000, 1); Pair pair2 = - generateTsFileAndObject(false, System.currentTimeMillis() - 1000000); + generateTsFileAndObject(false, System.currentTimeMillis() - 1000000, 2); tsFileManager.add(pair1.getLeft(), true); tsFileManager.add(pair2.getLeft(), false); CrossSpaceCompactionTask task = @@ -205,9 +219,9 @@ public void testCrossCompactionWithTTL() throws IOException, WriteProcessExcepti @Test public void testSettleCompaction() throws IOException, WriteProcessException { Pair pair1 = - generateTsFileAndObject(true, System.currentTimeMillis() - 10000); + generateTsFileAndObject(true, System.currentTimeMillis() - 10000, 3); Pair pair2 = - generateTsFileAndObject(true, System.currentTimeMillis() + 1000000); + generateTsFileAndObject(true, System.currentTimeMillis() + 1000000, 0); tsFileManager.add(pair1.getLeft(), true); tsFileManager.add(pair2.getLeft(), true); SettleCompactionTask task = @@ -224,19 +238,58 @@ public void testSettleCompaction() throws IOException, WriteProcessException { Assert.assertTrue(pair2.getRight().exists()); } - private Pair generateTsFileAndObject(boolean seq, long timestamp) - throws IOException, WriteProcessException { + @Test + public void testPlainObjectBinaryReplaceRegionId() { + IObjectPath objectPath = new PlainObjectPath(1, 0, new StringArrayDeviceID("t1.d1"), "s1"); + ByteBuffer buffer = + ByteBuffer.allocate(Long.BYTES + objectPath.getSerializeSizeToObjectValue()); + buffer.putLong(10); + objectPath.serializeToObjectValue(buffer); + + Binary origin = new Binary(buffer.array()); + Binary result = ObjectTypeUtils.replaceRegionIdForObjectBinary(10, origin); + ByteBuffer deserializeBuffer = ByteBuffer.wrap(result.getValues()); + deserializeBuffer.getLong(); + Assert.assertEquals( + new PlainObjectPath(10, 0, new StringArrayDeviceID("t1.d1"), "s1").toString(), + IObjectPath.getDeserializer().deserializeFromObjectValue(deserializeBuffer).toString()); + } + + @Test + public void testBase32ObjectBinaryReplaceRegionId() { + config.setRestrictObjectLimit(false); + try { + IObjectPath objectPath = new Base32ObjectPath(1, 0, new StringArrayDeviceID("t1.d1"), "s1"); + ByteBuffer buffer = + ByteBuffer.allocate(Long.BYTES + objectPath.getSerializeSizeToObjectValue()); + buffer.putLong(10); + objectPath.serializeToObjectValue(buffer); + + Binary origin = new Binary(buffer.array()); + Binary result = ObjectTypeUtils.replaceRegionIdForObjectBinary(10, origin); + ByteBuffer deserializeBuffer = ByteBuffer.wrap(result.getValues()); + deserializeBuffer.getLong(); + Assert.assertEquals( + new Base32ObjectPath(10, 0, new StringArrayDeviceID("t1.d1"), "s1").toString(), + IObjectPath.getDeserializer().deserializeFromObjectValue(deserializeBuffer).toString()); + } finally { + config.setRestrictObjectLimit(true); + } + } + + private Pair generateTsFileAndObject( + boolean seq, long timestamp, int regionIdInTsFile) throws IOException, WriteProcessException { TsFileResource resource = createEmptyFileAndResource(seq); - Path testFile1 = Files.createTempFile(objectDir.toPath(), "test_", ".bin"); + Path testFile1 = Files.createTempFile(regionDir.toPath(), "test_", ".bin"); byte[] content = new byte[100]; for (int i = 0; i < 100; i++) { content[i] = (byte) i; } Files.write(testFile1, content); - String relativePath = testFile1.toFile().getName(); - ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES + relativePath.length()); + String relativePathInTsFile = regionIdInTsFile + File.separator + testFile1.toFile().getName(); + ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES + relativePathInTsFile.length()); buffer.putLong(100L); - buffer.put(BytesUtils.stringToBytes(relativePath)); + buffer.put(BytesUtils.stringToBytes(relativePathInTsFile)); buffer.flip(); IDeviceID deviceID = new StringArrayDeviceID("t1", "d1"); try (TsFileIOWriter writer = new TsFileIOWriter(resource.getTsFile())) { diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template index 0a9a47ee8907..4330a11b1f60 100644 --- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template +++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template @@ -1307,6 +1307,11 @@ tier_ttl_in_ms=-1 # Datatype: long max_object_file_size_in_byte=4294967296 +# There are no special restrictions on table names, column names, and device names of the OBJECT type. +# effectiveMode: first_start +# Datatype: boolean +restrict_object_limit=false + #################### ### Compaction Configurations #################### diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift index f2b8ec6b8b07..01bde7c378dc 100644 --- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift +++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift @@ -50,6 +50,7 @@ struct TGlobalConfig { 11: optional i32 tagAttributeTotalSize 12: optional bool isEnterprise 13: optional i64 timePartitionOrigin + 14: optional bool restrictObjectLimit } struct TRatisConfig {