Skip to content

Commit

Permalink
Pipe: Fix PipeSetTTLPlan is not handled correctly (#12571)
Browse files Browse the repository at this point in the history
  • Loading branch information
Caideyipi authored May 22, 2024
1 parent 7d4dd9e commit f1d8fce
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,7 @@ public void testTemplateInclusion() throws Exception {
final Map<String, String> connectorAttributes = new HashMap<>();

extractorAttributes.put("extractor.inclusion", "data, schema");
extractorAttributes.put(
"extractor.inclusion.exclusion", "schema.timeseries.ordinary, schema.ttl");
extractorAttributes.put("extractor.inclusion.exclusion", "schema.timeseries.ordinary");
extractorAttributes.put("extractor.path", "root.ln.**");

connectorAttributes.put("connector", "iotdb-thrift-connector");
Expand Down Expand Up @@ -154,7 +153,7 @@ public void testTemplateInclusion() throws Exception {
"Database,TTL,SchemaReplicationFactor,DataReplicationFactor,TimePartitionInterval,",
// Receiver's SchemaReplicationFactor/DataReplicationFactor shall be 3/2 regardless of the
// sender
Collections.singleton("root.ln,null,3,2,604800000,"));
Collections.singleton("root.ln,3600000,3,2,604800000,"));
TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"select * from root.**",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public class PipeConfigRegionSnapshotEvent extends PipeSnapshotEvent {
Arrays.asList(
ConfigPhysicalPlanType.CreateDatabase.getPlanType(),
ConfigPhysicalPlanType.SetTTL.getPlanType(),
ConfigPhysicalPlanType.PipeSetTTL.getPlanType(),
ConfigPhysicalPlanType.CreateSchemaTemplate.getPlanType(),
ConfigPhysicalPlanType.CommitSetSchemaTemplate.getPlanType()))));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ public class ConfigRegionListeningFilter {
Collections.singletonList(ConfigPhysicalPlanType.DropSchemaTemplate));
OPTION_PLAN_MAP.put(
new PartialPath("schema.timeseries.template.unset"),
Collections.singletonList(ConfigPhysicalPlanType.PipeUnsetTemplate));
Collections.unmodifiableList(
Arrays.asList(
ConfigPhysicalPlanType.UnsetTemplate, ConfigPhysicalPlanType.PipeUnsetTemplate)));

OPTION_PLAN_MAP.put(
new PartialPath("schema.timeseries.ordinary.drop"),
Expand All @@ -96,7 +98,9 @@ public class ConfigRegionListeningFilter {
Collections.singletonList(ConfigPhysicalPlanType.PipeDeactivateTemplate));

OPTION_PLAN_MAP.put(
new PartialPath("schema.ttl"), Collections.singletonList(ConfigPhysicalPlanType.SetTTL));
new PartialPath("schema.ttl"),
Collections.unmodifiableList(
Arrays.asList(ConfigPhysicalPlanType.SetTTL, ConfigPhysicalPlanType.PipeSetTTL)));

OPTION_PLAN_MAP.put(
new PartialPath("auth.role.create"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -269,8 +270,11 @@ public Optional<ConfigPhysicalPlan> visitPipeDeactivateTemplate(
@Override
public Optional<ConfigPhysicalPlan> visitTTL(
final SetTTLPlan setTTLPlan, final IoTDBPipePattern pattern) {
final PartialPath databasePath = new PartialPath(setTTLPlan.getDatabasePathPattern());
final List<PartialPath> intersectionList =
pattern.getIntersection(new PartialPath(setTTLPlan.getDatabasePathPattern()));
pattern.matchPrefixPath(databasePath.getFullPath())
? Collections.singletonList(databasePath)
: pattern.getIntersection(databasePath);
return !intersectionList.isEmpty()
? Optional.of(
new PipeSetTTLPlan(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,9 +507,11 @@ public TSStatus executeNonQueryPlan(ConfigPhysicalPlan physicalPlan)
// Will not be actually executed.
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
case PipeUnsetTemplate:
// PipeUnsetTemplate plan will not be written here, and exists only after pipe sender
// collects UnsetTemplatePlan and before receiver calls ConfigManager.
throw new UnsupportedOperationException("PipeUnsetTemplate is not supported.");
case PipeSetTTL:
// PipeUnsetTemplate/PipeSetTTL plan will not be written here, and exists only after pipe
// sender collects UnsetTemplatePlan/SetTTLPlan and before receiver calls ConfigManager.
throw new UnsupportedOperationException(
String.format("Plan type %s is not supported.", physicalPlan.getType()));
case TestOnly:
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,13 +391,13 @@ public void testSetTTL() throws IllegalPathException {
((PipeSetTTLPlan)
IoTDBConfigRegionExtractor.PATTERN_PARSE_VISITOR
.visitTTL(
new SetTTLPlan(Arrays.asList("root", "*", "device", "s1"), Long.MAX_VALUE),
new SetTTLPlan(Arrays.asList("root", "db", "**"), Long.MAX_VALUE),
prefixPathPattern)
.orElseThrow(AssertionError::new))
.getSetTTLPlans();

Assert.assertEquals(
Collections.singletonList(new PartialPath("root.db.device.s1")),
Collections.singletonList(new PartialPath("root.db.device.**")),
plans.stream()
.map(setTTLPlan -> new PartialPath(setTTLPlan.getDatabasePathPattern()))
.collect(Collectors.toList()));
Expand Down

0 comments on commit f1d8fce

Please sign in to comment.