From f1d8fce46c2e967d89fc9e07c796a39b7645c80a Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 22 May 2024 19:00:06 +0800 Subject: [PATCH] Pipe: Fix PipeSetTTLPlan is not handled correctly (#12571) --- .../iotdb/pipe/it/manual/IoTDBPipeMetaHistoricalIT.java | 5 ++--- .../manager/pipe/event/PipeConfigRegionSnapshotEvent.java | 1 + .../pipe/extractor/ConfigRegionListeningFilter.java | 8 ++++++-- .../PipeConfigPhysicalPlanPatternParseVisitor.java | 6 +++++- .../persistence/executor/ConfigPlanExecutor.java | 8 +++++--- .../PipeConfigPhysicalPlanPatternParseVisitorTest.java | 4 ++-- 6 files changed, 21 insertions(+), 11 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeMetaHistoricalIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeMetaHistoricalIT.java index 70316d3b600a..858af4cb5766 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeMetaHistoricalIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeMetaHistoricalIT.java @@ -119,8 +119,7 @@ public void testTemplateInclusion() throws Exception { final Map connectorAttributes = new HashMap<>(); extractorAttributes.put("extractor.inclusion", "data, schema"); - extractorAttributes.put( - "extractor.inclusion.exclusion", "schema.timeseries.ordinary, schema.ttl"); + extractorAttributes.put("extractor.inclusion.exclusion", "schema.timeseries.ordinary"); extractorAttributes.put("extractor.path", "root.ln.**"); connectorAttributes.put("connector", "iotdb-thrift-connector"); @@ -154,7 +153,7 @@ public void testTemplateInclusion() throws Exception { "Database,TTL,SchemaReplicationFactor,DataReplicationFactor,TimePartitionInterval,", // Receiver's SchemaReplicationFactor/DataReplicationFactor shall be 3/2 regardless of the // sender - Collections.singleton("root.ln,null,3,2,604800000,")); + Collections.singleton("root.ln,3600000,3,2,604800000,")); TestUtils.assertDataEventuallyOnEnv( receiverEnv, "select * from root.**", diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java index 407dec4887e7..9e7a252eb6e7 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java @@ -78,6 +78,7 @@ public class PipeConfigRegionSnapshotEvent extends PipeSnapshotEvent { Arrays.asList( ConfigPhysicalPlanType.CreateDatabase.getPlanType(), ConfigPhysicalPlanType.SetTTL.getPlanType(), + ConfigPhysicalPlanType.PipeSetTTL.getPlanType(), ConfigPhysicalPlanType.CreateSchemaTemplate.getPlanType(), ConfigPhysicalPlanType.CommitSetSchemaTemplate.getPlanType())))); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/ConfigRegionListeningFilter.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/ConfigRegionListeningFilter.java index 2f1ee641ba6e..f782f3e06171 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/ConfigRegionListeningFilter.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/ConfigRegionListeningFilter.java @@ -83,7 +83,9 @@ public class ConfigRegionListeningFilter { Collections.singletonList(ConfigPhysicalPlanType.DropSchemaTemplate)); OPTION_PLAN_MAP.put( new PartialPath("schema.timeseries.template.unset"), - Collections.singletonList(ConfigPhysicalPlanType.PipeUnsetTemplate)); + Collections.unmodifiableList( + Arrays.asList( + ConfigPhysicalPlanType.UnsetTemplate, ConfigPhysicalPlanType.PipeUnsetTemplate))); OPTION_PLAN_MAP.put( new PartialPath("schema.timeseries.ordinary.drop"), @@ -96,7 +98,9 @@ public class ConfigRegionListeningFilter { Collections.singletonList(ConfigPhysicalPlanType.PipeDeactivateTemplate)); OPTION_PLAN_MAP.put( - new PartialPath("schema.ttl"), Collections.singletonList(ConfigPhysicalPlanType.SetTTL)); + new PartialPath("schema.ttl"), + Collections.unmodifiableList( + Arrays.asList(ConfigPhysicalPlanType.SetTTL, ConfigPhysicalPlanType.PipeSetTTL))); OPTION_PLAN_MAP.put( new PartialPath("auth.role.create"), diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/PipeConfigPhysicalPlanPatternParseVisitor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/PipeConfigPhysicalPlanPatternParseVisitor.java index e5ea0cccd3b3..93424df84166 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/PipeConfigPhysicalPlanPatternParseVisitor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/PipeConfigPhysicalPlanPatternParseVisitor.java @@ -48,6 +48,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; @@ -269,8 +270,11 @@ public Optional visitPipeDeactivateTemplate( @Override public Optional visitTTL( final SetTTLPlan setTTLPlan, final IoTDBPipePattern pattern) { + final PartialPath databasePath = new PartialPath(setTTLPlan.getDatabasePathPattern()); final List intersectionList = - pattern.getIntersection(new PartialPath(setTTLPlan.getDatabasePathPattern())); + pattern.matchPrefixPath(databasePath.getFullPath()) + ? Collections.singletonList(databasePath) + : pattern.getIntersection(databasePath); return !intersectionList.isEmpty() ? Optional.of( new PipeSetTTLPlan( diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java index f47634065650..aa65375290a1 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java @@ -507,9 +507,11 @@ public TSStatus executeNonQueryPlan(ConfigPhysicalPlan physicalPlan) // Will not be actually executed. return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); case PipeUnsetTemplate: - // PipeUnsetTemplate plan will not be written here, and exists only after pipe sender - // collects UnsetTemplatePlan and before receiver calls ConfigManager. - throw new UnsupportedOperationException("PipeUnsetTemplate is not supported."); + case PipeSetTTL: + // PipeUnsetTemplate/PipeSetTTL plan will not be written here, and exists only after pipe + // sender collects UnsetTemplatePlan/SetTTLPlan and before receiver calls ConfigManager. + throw new UnsupportedOperationException( + String.format("Plan type %s is not supported.", physicalPlan.getType())); case TestOnly: return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); default: diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/extractor/PipeConfigPhysicalPlanPatternParseVisitorTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/extractor/PipeConfigPhysicalPlanPatternParseVisitorTest.java index a2ff81a9da44..5b341d11da80 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/extractor/PipeConfigPhysicalPlanPatternParseVisitorTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/extractor/PipeConfigPhysicalPlanPatternParseVisitorTest.java @@ -391,13 +391,13 @@ public void testSetTTL() throws IllegalPathException { ((PipeSetTTLPlan) IoTDBConfigRegionExtractor.PATTERN_PARSE_VISITOR .visitTTL( - new SetTTLPlan(Arrays.asList("root", "*", "device", "s1"), Long.MAX_VALUE), + new SetTTLPlan(Arrays.asList("root", "db", "**"), Long.MAX_VALUE), prefixPathPattern) .orElseThrow(AssertionError::new)) .getSetTTLPlans(); Assert.assertEquals( - Collections.singletonList(new PartialPath("root.db.device.s1")), + Collections.singletonList(new PartialPath("root.db.device.**")), plans.stream() .map(setTTLPlan -> new PartialPath(setTTLPlan.getDatabasePathPattern())) .collect(Collectors.toList()));