diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IngestionSmokeTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IngestionSmokeTest.java index a6c3f0619c18..1e9ec375ce95 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IngestionSmokeTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IngestionSmokeTest.java @@ -366,7 +366,7 @@ private KafkaSupervisorSpec createKafkaSupervisor(String topic) { return MoreResources.Supervisor.KAFKA_JSON .get() - .withDataSchema(schema -> schema.withTimestamp(new TimestampSpec("timestamp", null, null))) + .withDataSchema(schema -> schema.withTimestamp(TimestampSpec.DEFAULT)) .withIoConfig( ioConfig -> ioConfig .withConsumerProperties(kafkaServer.consumerProperties()) diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaIndexDataFormatsTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaIndexDataFormatsTest.java index eb63b791329c..7a701e5e9768 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaIndexDataFormatsTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaIndexDataFormatsTest.java @@ -41,7 +41,7 @@ public SupervisorSpec createSupervisor(String dataSource, String topic, InputFor { return MoreResources.Supervisor.KAFKA_JSON .get() - .withDataSchema(schema -> schema.withTimestamp(new TimestampSpec("timestamp", null, null))) + .withDataSchema(schema -> schema.withTimestamp(TimestampSpec.DEFAULT)) .withIoConfig( ioConfig -> ioConfig .withInputFormat(inputFormat) diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/StreamIndexTestBase.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/StreamIndexTestBase.java index 38797ae14fc3..c9ec82e60e5d 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/StreamIndexTestBase.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/StreamIndexTestBase.java @@ -101,7 +101,7 @@ protected KinesisSupervisorSpec createKinesisSupervisor(KinesisResource kinesis, null, DataSchema.builder() .withDataSource(dataSource) - .withTimestamp(new TimestampSpec("timestamp", null, null)) + .withTimestamp(TimestampSpec.DEFAULT) .withGranularity(new UniformGranularitySpec(Granularities.HOUR, null, null)) .withDimensions(DimensionsSpec.EMPTY) .build(), diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/kinesis/KinesisDataFormatsTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/kinesis/KinesisDataFormatsTest.java index c60149272242..2c5cfb67d9c3 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/kinesis/KinesisDataFormatsTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/kinesis/KinesisDataFormatsTest.java @@ -68,7 +68,7 @@ private KinesisSupervisorSpec createKinesisSupervisorSpec(String dataSource, Str null, DataSchema.builder() .withDataSource(dataSource) - .withTimestamp(new TimestampSpec("timestamp", null, null)) + .withTimestamp(TimestampSpec.DEFAULT) .withGranularity(new UniformGranularitySpec(Granularities.HOUR, null, null)) .withDimensions(DimensionsSpec.EMPTY) .build(), diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/OverlordClientTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/OverlordClientTest.java index b4a54753d404..c445d52a3838 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/OverlordClientTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/OverlordClientTest.java @@ -214,7 +214,7 @@ public void test_postSupervisor_fails_ifRequiredExtensionIsNotLoaded() { final KafkaSupervisorSpec kafkaSupervisor = MoreResources.Supervisor.KAFKA_JSON .get() - .withDataSchema(schema -> schema.withTimestamp(new TimestampSpec(null, null, null))) + .withDataSchema(schema -> schema.withTimestamp(TimestampSpec.DEFAULT)) .withIoConfig(ioConfig -> ioConfig.withConsumerProperties(Map.of("bootstrap.servers", "localhost"))) .build(dataSource, "topic"); diff --git a/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftInputFormat.java b/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftInputFormat.java index fdf638fa7e3b..5c70348239df 100644 --- a/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftInputFormat.java +++ b/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftInputFormat.java @@ -53,8 +53,7 @@ public ThriftInputFormat( { super(flattenSpec); this.thriftJar = thriftJar; - InvalidInput.conditionalException(thriftClass != null, "thriftClass must not be null"); - this.thriftClass = thriftClass; + this.thriftClass = InvalidInput.notNull(thriftClass, "thriftClass"); } @Nullable diff --git a/extensions-contrib/thrift-extensions/src/test/java/org/apache/druid/data/input/thrift/ThriftInputRowParserTest.java b/extensions-contrib/thrift-extensions/src/test/java/org/apache/druid/data/input/thrift/ThriftInputRowParserTest.java index a7309417f84d..8b4266df918f 100644 --- a/extensions-contrib/thrift-extensions/src/test/java/org/apache/druid/data/input/thrift/ThriftInputRowParserTest.java +++ b/extensions-contrib/thrift-extensions/src/test/java/org/apache/druid/data/input/thrift/ThriftInputRowParserTest.java @@ -121,7 +121,7 @@ public void testParse() throws Exception public void testDisableJavaScript() { final JavaScriptParseSpec parseSpec = new JavaScriptParseSpec( - new TimestampSpec("timestamp", "auto", null), + TimestampSpec.DEFAULT, new DimensionsSpec( DimensionsSpec.getDefaultSchemas( ImmutableList.of( diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroOCFReaderTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroOCFReaderTest.java index a9c0aaeec381..a816b3e62bad 100644 --- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroOCFReaderTest.java +++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroOCFReaderTest.java @@ -192,12 +192,11 @@ private InputEntityReader createReader( { final GenericRecord someAvroDatum = AvroStreamInputFormatTest.buildSomeAvroDatum(); final File someAvroFile = AvroStreamInputFormatTest.createAvroFile(someAvroDatum); - final TimestampSpec timestampSpec = new TimestampSpec("timestamp", "auto", null); final DimensionsSpec dimensionsSpec = new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of( "eventType"))); final AvroOCFInputFormat inputFormat = new AvroOCFInputFormat(mapper, null, readerSchema, null, null); - final InputRowSchema schema = new InputRowSchema(timestampSpec, dimensionsSpec, ColumnsFilter.all()); + final InputRowSchema schema = new InputRowSchema(TimestampSpec.DEFAULT, dimensionsSpec, ColumnsFilter.all()); final FileEntity entity = new FileEntity(someAvroFile); return inputFormat.createReader(schema, entity, temporaryFolder.newFolder()); } diff --git a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java index 562f94e07e1e..53575a248f70 100644 --- a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java +++ b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java @@ -81,7 +81,7 @@ public class HdfsInputSourceTest extends InitializedNullHandlingTest private static final HdfsInputSourceConfig DEFAULT_INPUT_SOURCE_CONFIG = new HdfsInputSourceConfig(null); private static final String COLUMN = "value"; private static final InputRowSchema INPUT_ROW_SCHEMA = new InputRowSchema( - new TimestampSpec(null, null, null), + TimestampSpec.DEFAULT, DimensionsSpec.EMPTY, ColumnsFilter.all() ); diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index 1edeca1bf215..4c0ca3284d3a 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -419,7 +419,7 @@ public void testIngestNullColumnAfterDataInserted() throws Exception ); final KafkaIndexTask task = createTask( null, - NEW_DATA_SCHEMA.withDimensionsSpec(dimensionsSpec), + DATA_SCHEMA.withDimensionsSpec(dimensionsSpec), new KafkaIndexTaskIOConfig( 0, "sequence0", @@ -463,7 +463,7 @@ public void testIngestNullColumnAfterDataInserted_storeEmptyColumnsOff_shouldNot final KafkaIndexTask task = createTask( null, - NEW_DATA_SCHEMA.withDimensionsSpec( + DATA_SCHEMA.withDimensionsSpec( new DimensionsSpec( ImmutableList.of( new StringDimensionSchema("dim1"), @@ -667,7 +667,7 @@ public void testIncrementalHandOff() throws Exception Assert.assertTrue( checkpointRequestsHash.contains( Objects.hash( - NEW_DATA_SCHEMA.getDataSource(), + DATA_SCHEMA.getDataSource(), 0, new KafkaDataSourceMetadata(startPartitions) ) @@ -797,7 +797,7 @@ public void testIncrementalHandOffMaxTotalRows() throws Exception Assert.assertTrue( checkpointRequestsHash.contains( Objects.hash( - NEW_DATA_SCHEMA.getDataSource(), + DATA_SCHEMA.getDataSource(), 0, new KafkaDataSourceMetadata(startPartitions) ) @@ -806,7 +806,7 @@ public void testIncrementalHandOffMaxTotalRows() throws Exception Assert.assertTrue( checkpointRequestsHash.contains( Objects.hash( - NEW_DATA_SCHEMA.getDataSource(), + DATA_SCHEMA.getDataSource(), 0, new KafkaDataSourceMetadata( new SeekableStreamStartSequenceNumbers<>(topic, currentOffsets, ImmutableSet.of()) @@ -905,7 +905,7 @@ public void testTimeBasedIncrementalHandOff() throws Exception Assert.assertTrue( checkpointRequestsHash.contains( Objects.hash( - NEW_DATA_SCHEMA.getDataSource(), + DATA_SCHEMA.getDataSource(), 0, new KafkaDataSourceMetadata(startPartitions) ) @@ -990,7 +990,7 @@ public void testCheckpointResetWithSameEndOffsets() throws Exception DataSourceMetadata newDataSchemaMetadata() { - return metadataStorageCoordinator.retrieveDataSourceMetadata(NEW_DATA_SCHEMA.getDataSource()); + return metadataStorageCoordinator.retrieveDataSourceMetadata(DATA_SCHEMA.getDataSource()); } @Test(timeout = 60_000L) @@ -1201,7 +1201,7 @@ public void testRunWithTransformSpec() throws Exception { final KafkaIndexTask task = createTask( null, - NEW_DATA_SCHEMA.withTransformSpec( + DATA_SCHEMA.withTransformSpec( new TransformSpec( new SelectorDimFilter("dim1", "b", null), ImmutableList.of( @@ -2714,7 +2714,7 @@ public void testSerde() throws Exception final KafkaIndexTask task = createTask( "taskid", - NEW_DATA_SCHEMA.withTransformSpec( + DATA_SCHEMA.withTransformSpec( new TransformSpec( null, ImmutableList.of(new ExpressionTransform("beep", "nofunc()", ExprMacroTable.nil())) @@ -2747,7 +2747,7 @@ public void testCorrectInputSources() throws Exception final KafkaIndexTask task = createTask( "taskid", - NEW_DATA_SCHEMA.withTransformSpec( + DATA_SCHEMA.withTransformSpec( new TransformSpec( null, ImmutableList.of(new ExpressionTransform("beep", "nofunc()", ExprMacroTable.nil())) @@ -2809,7 +2809,7 @@ private void awaitConsumedOffsets(final KafkaIndexTask task, final Map> scanData(final Task task, QuerySegmentSpec spec) { - ScanQuery query = new Druids.ScanQueryBuilder().dataSource(NEW_DATA_SCHEMA.getDataSource()) + ScanQuery query = new Druids.ScanQueryBuilder().dataSource(DATA_SCHEMA.getDataSource()) .intervals(spec) .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_LIST) .build(); @@ -2866,7 +2866,7 @@ private KafkaIndexTask createTask( final KafkaIndexTaskIOConfig ioConfig ) throws JsonProcessingException { - return createTask(taskId, NEW_DATA_SCHEMA, ioConfig); + return createTask(taskId, DATA_SCHEMA, ioConfig); } private KafkaIndexTask createTask( @@ -2875,7 +2875,7 @@ private KafkaIndexTask createTask( final Map context ) throws JsonProcessingException { - return createTask(taskId, NEW_DATA_SCHEMA, ioConfig, context); + return createTask(taskId, DATA_SCHEMA, ioConfig, context); } private KafkaIndexTask createTask( @@ -2945,7 +2945,7 @@ private KafkaIndexTask createTask( private static DataSchema cloneDataSchema(final DataSchema dataSchema) { - return DataSchema.builder(dataSchema).withObjectMapper(OBJECT_MAPPER).build(); + return DataSchema.builder(dataSchema).build(); } @Override @@ -3374,7 +3374,7 @@ public void testTaskWithTransformSpecDoesNotCauseCliPeonCyclicDependency() final KafkaIndexTask task = createTask( "index_kafka_test_id1", - NEW_DATA_SCHEMA.withTransformSpec( + DATA_SCHEMA.withTransformSpec( new TransformSpec( new SelectorDimFilter("dim1", "b", null), ImmutableList.of( diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java index e0e46cf41b22..dc9c71868155 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java @@ -364,6 +364,7 @@ private static byte[] jb(String timestamp, String dim1, String dim2, String dimL public void testInvalidKafkaConfig() { KafkaSupervisorSpec supervisorSpec = new KafkaSupervisorSpecBuilder() + .withDataSchema(DATA_SCHEMA) .withIoConfig( ioConfig -> ioConfig .withJsonInputFormat() @@ -388,6 +389,7 @@ public void testInvalidKafkaConfig() public void testGetInputSourceResources() { KafkaSupervisorSpec supervisorSpec = new KafkaSupervisorSpecBuilder() + .withDataSchema(DATA_SCHEMA) .withIoConfig( ioConfig -> ioConfig .withJsonInputFormat() diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java index 0e47869018f3..4f6088a8868b 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java @@ -74,91 +74,6 @@ public KafkaSupervisorSpecTest() mapper.registerModules(new KafkaIndexTaskModule().getJacksonModules()); } - @Test - public void testSerde() throws IOException - { - String json = "{\n" - + " \"type\": \"kafka\",\n" - + " \"dataSchema\": {\n" - + " \"dataSource\": \"metrics-kafka\",\n" - + " \"parser\": {\n" - + " \"type\": \"string\",\n" - + " \"parseSpec\": {\n" - + " \"format\": \"json\",\n" - + " \"timestampSpec\": {\n" - + " \"column\": \"timestamp\",\n" - + " \"format\": \"auto\"\n" - + " },\n" - + " \"dimensionsSpec\": {\n" - + " \"dimensions\": [],\n" - + " \"dimensionExclusions\": [\n" - + " \"timestamp\",\n" - + " \"value\"\n" - + " ]\n" - + " }\n" - + " }\n" - + " },\n" - + " \"metricsSpec\": [\n" - + " {\n" - + " \"name\": \"count\",\n" - + " \"type\": \"count\"\n" - + " },\n" - + " {\n" - + " \"name\": \"value_sum\",\n" - + " \"fieldName\": \"value\",\n" - + " \"type\": \"doubleSum\"\n" - + " },\n" - + " {\n" - + " \"name\": \"value_min\",\n" - + " \"fieldName\": \"value\",\n" - + " \"type\": \"doubleMin\"\n" - + " },\n" - + " {\n" - + " \"name\": \"value_max\",\n" - + " \"fieldName\": \"value\",\n" - + " \"type\": \"doubleMax\"\n" - + " }\n" - + " ],\n" - + " \"granularitySpec\": {\n" - + " \"type\": \"uniform\",\n" - + " \"segmentGranularity\": \"HOUR\",\n" - + " \"queryGranularity\": \"NONE\"\n" - + " }\n" - + " },\n" - + " \"ioConfig\": {\n" - + " \"topic\": \"metrics\",\n" - + " \"consumerProperties\": {\n" - + " \"bootstrap.servers\": \"localhost:9092\"\n" - + " },\n" - + " \"taskCount\": 1\n" - + " }\n" - + "}"; - KafkaSupervisorSpec spec = mapper.readValue(json, KafkaSupervisorSpec.class); - - Assert.assertNotNull(spec); - Assert.assertNotNull(spec.getDataSchema()); - Assert.assertEquals(4, spec.getDataSchema().getAggregators().length); - Assert.assertNotNull(spec.getIoConfig()); - Assert.assertEquals("metrics", spec.getIoConfig().getTopic()); - Assert.assertNull(spec.getIoConfig().getTopicPattern()); - Assert.assertNotNull(spec.getTuningConfig()); - Assert.assertNull(spec.getContext()); - Assert.assertFalse(spec.isSuspended()); - String serialized = mapper.writeValueAsString(spec); - - // expect default values populated in reserialized string - Assert.assertTrue(serialized.contains("\"tuningConfig\":{")); - Assert.assertTrue(serialized.contains("\"indexSpec\":{")); - Assert.assertTrue(serialized.contains("\"suspended\":false")); - Assert.assertTrue(serialized.contains("\"parser\":{")); - - KafkaSupervisorSpec spec2 = mapper.readValue(serialized, KafkaSupervisorSpec.class); - - String stable = mapper.writeValueAsString(spec2); - - Assert.assertEquals(serialized, stable); - } - @Test public void testSerdeWithTopicPattern() throws IOException { @@ -166,22 +81,16 @@ public void testSerdeWithTopicPattern() throws IOException + " \"type\": \"kafka\",\n" + " \"dataSchema\": {\n" + " \"dataSource\": \"metrics-kafka\",\n" - + " \"parser\": {\n" - + " \"type\": \"string\",\n" - + " \"parseSpec\": {\n" - + " \"format\": \"json\",\n" - + " \"timestampSpec\": {\n" - + " \"column\": \"timestamp\",\n" - + " \"format\": \"auto\"\n" - + " },\n" - + " \"dimensionsSpec\": {\n" - + " \"dimensions\": [],\n" - + " \"dimensionExclusions\": [\n" - + " \"timestamp\",\n" - + " \"value\"\n" - + " ]\n" - + " }\n" - + " }\n" + + " \"timestampSpec\": {\n" + + " \"column\": \"timestamp\",\n" + + " \"format\": \"auto\"\n" + + " },\n" + + " \"dimensionsSpec\": {\n" + + " \"dimensions\": [],\n" + + " \"dimensionExclusions\": [\n" + + " \"timestamp\",\n" + + " \"value\"\n" + + " ]\n" + " },\n" + " \"metricsSpec\": [\n" + " {\n" @@ -212,6 +121,14 @@ public void testSerdeWithTopicPattern() throws IOException + " },\n" + " \"ioConfig\": {\n" + " \"topicPattern\": \"metrics.*\",\n" + + " \"inputFormat\": {\n" + + " \"type\": \"json\",\n" + + " \"flattenSpec\": {\n" + + " \"useFieldDiscovery\": true,\n" + + " \"fields\": []\n" + + " },\n" + + " \"featureSpec\": {}\n" + + " }," + " \"consumerProperties\": {\n" + " \"bootstrap.servers\": \"localhost:9092\"\n" + " },\n" @@ -325,93 +242,6 @@ public void testSerdeWithInputFormat() throws IOException Assert.assertEquals(serialized, stable); } - @Test - public void testSerdeWithSpec() throws IOException - { - String json = "{\n" - + " \"type\": \"kafka\",\n" - + " \"spec\": {\n" - + " \"dataSchema\": {\n" - + " \"dataSource\": \"metrics-kafka\",\n" - + " \"parser\": {\n" - + " \"type\": \"string\",\n" - + " \"parseSpec\": {\n" - + " \"format\": \"json\",\n" - + " \"timestampSpec\": {\n" - + " \"column\": \"timestamp\",\n" - + " \"format\": \"auto\"\n" - + " },\n" - + " \"dimensionsSpec\": {\n" - + " \"dimensions\": [],\n" - + " \"dimensionExclusions\": [\n" - + " \"timestamp\",\n" - + " \"value\"\n" - + " ]\n" - + " }\n" - + " }\n" - + " },\n" - + " \"metricsSpec\": [\n" - + " {\n" - + " \"name\": \"count\",\n" - + " \"type\": \"count\"\n" - + " },\n" - + " {\n" - + " \"name\": \"value_sum\",\n" - + " \"fieldName\": \"value\",\n" - + " \"type\": \"doubleSum\"\n" - + " },\n" - + " {\n" - + " \"name\": \"value_min\",\n" - + " \"fieldName\": \"value\",\n" - + " \"type\": \"doubleMin\"\n" - + " },\n" - + " {\n" - + " \"name\": \"value_max\",\n" - + " \"fieldName\": \"value\",\n" - + " \"type\": \"doubleMax\"\n" - + " }\n" - + " ],\n" - + " \"granularitySpec\": {\n" - + " \"type\": \"uniform\",\n" - + " \"segmentGranularity\": \"HOUR\",\n" - + " \"queryGranularity\": \"NONE\"\n" - + " }\n" - + " },\n" - + " \"ioConfig\": {\n" - + " \"topic\": \"metrics\",\n" - + " \"consumerProperties\": {\n" - + " \"bootstrap.servers\": \"localhost:9092\"\n" - + " },\n" - + " \"taskCount\": 1\n" - + " }\n" - + " }\n" - + "}"; - KafkaSupervisorSpec spec = mapper.readValue(json, KafkaSupervisorSpec.class); - - Assert.assertNotNull(spec); - Assert.assertNotNull(spec.getDataSchema()); - Assert.assertEquals(4, spec.getDataSchema().getAggregators().length); - Assert.assertNotNull(spec.getIoConfig()); - Assert.assertEquals("metrics", spec.getIoConfig().getTopic()); - Assert.assertNull(spec.getIoConfig().getTopicPattern()); - Assert.assertNotNull(spec.getTuningConfig()); - Assert.assertNull(spec.getContext()); - Assert.assertFalse(spec.isSuspended()); - String serialized = mapper.writeValueAsString(spec); - - // expect default values populated in reserialized string - Assert.assertTrue(serialized.contains("\"tuningConfig\":{")); - Assert.assertTrue(serialized.contains("\"indexSpec\":{")); - Assert.assertTrue(serialized.contains("\"suspended\":false")); - Assert.assertTrue(serialized.contains("\"parser\":{")); - - KafkaSupervisorSpec spec2 = mapper.readValue(serialized, KafkaSupervisorSpec.class); - - String stable = mapper.writeValueAsString(spec2); - - Assert.assertEquals(serialized, stable); - } - @Test public void testSerdeWithSpecAndInputFormat() throws IOException { @@ -508,22 +338,16 @@ public void testSuspendResume() throws IOException + " \"type\": \"kafka\",\n" + " \"dataSchema\": {\n" + " \"dataSource\": \"metrics-kafka\",\n" - + " \"parser\": {\n" - + " \"type\": \"string\",\n" - + " \"parseSpec\": {\n" - + " \"format\": \"json\",\n" - + " \"timestampSpec\": {\n" - + " \"column\": \"timestamp\",\n" - + " \"format\": \"auto\"\n" - + " },\n" - + " \"dimensionsSpec\": {\n" - + " \"dimensions\": [],\n" - + " \"dimensionExclusions\": [\n" - + " \"timestamp\",\n" - + " \"value\"\n" - + " ]\n" - + " }\n" - + " }\n" + + " \"timestampSpec\": {\n" + + " \"column\": \"timestamp\",\n" + + " \"format\": \"auto\"\n" + + " },\n" + + " \"dimensionsSpec\": {\n" + + " \"dimensions\": [],\n" + + " \"dimensionExclusions\": [\n" + + " \"timestamp\",\n" + + " \"value\"\n" + + " ]\n" + " },\n" + " \"metricsSpec\": [\n" + " {\n" @@ -554,6 +378,14 @@ public void testSuspendResume() throws IOException + " },\n" + " \"ioConfig\": {\n" + " \"topic\": \"metrics\",\n" + + " \"inputFormat\": {\n" + + " \"type\": \"json\",\n" + + " \"flattenSpec\": {\n" + + " \"useFieldDiscovery\": true,\n" + + " \"fields\": []\n" + + " },\n" + + " \"featureSpec\": {}\n" + + " }," + " \"consumerProperties\": {\n" + " \"bootstrap.servers\": \"localhost:9092\"\n" + " },\n" @@ -676,7 +508,7 @@ public void test_validateSpecUpdateTo() KafkaSupervisorSpec validDestSpec = new KafkaSupervisorSpecBuilder() .withDataSchema( schema -> schema - .withTimestamp(new TimestampSpec(null, null, null)) + .withTimestamp(TimestampSpec.DEFAULT) .withAggregators(new CountAggregatorFactory("rows")) .withGranularity(new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null)) ) @@ -694,7 +526,7 @@ private KafkaSupervisorSpec getSpec(String topic, String topicPattern) KafkaSupervisorSpecBuilder builder = new KafkaSupervisorSpecBuilder() .withDataSchema( schema -> schema - .withTimestamp(new TimestampSpec(null, null, null)) + .withTimestamp(TimestampSpec.DEFAULT) .withAggregators(new CountAggregatorFactory("rows")) .withGranularity(new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null)) ) diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java index 84b843958ebb..76b08bb7b853 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java @@ -24,6 +24,7 @@ import com.google.inject.Module; import com.google.inject.name.Names; import org.apache.druid.common.aws.AWSCredentialsConfig; +import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.guice.GuiceInjectors; import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory; import org.apache.druid.indexing.common.task.TestAppenderatorsManager; @@ -52,7 +53,7 @@ public class KinesisIndexTaskSerdeTest { private static final DataSchema DATA_SCHEMA = - DataSchema.builder().withDataSource("dataSource").build(); + DataSchema.builder().withDataSource("dataSource").withTimestamp(TimestampSpec.DEFAULT).build(); private static final KinesisIndexTaskTuningConfig TUNING_CONFIG = new KinesisIndexTaskTuningConfig( null, null, diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index a0337fa95d44..ed96e947345d 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -385,7 +385,7 @@ public void testIngestNullColumnAfterDataInserted() throws Exception ) ); final KinesisIndexTask task = createTask( - NEW_DATA_SCHEMA.withDimensionsSpec(dimensionsSpec), + DATA_SCHEMA.withDimensionsSpec(dimensionsSpec), ImmutableMap.of(SHARD_ID1, "2"), ImmutableMap.of(SHARD_ID1, "4") ); @@ -424,7 +424,7 @@ public void testIngestNullColumnAfterDataInserted_storeEmptyColumnsOff_shouldNot replayAll(); final KinesisIndexTask task = createTask( - NEW_DATA_SCHEMA.withDimensionsSpec( + DATA_SCHEMA.withDimensionsSpec( new DimensionsSpec( ImmutableList.of( new StringDimensionSchema("dim1"), @@ -453,59 +453,9 @@ public void testIngestNullColumnAfterDataInserted_storeEmptyColumnsOff_shouldNot } } - @Test(timeout = 120_000L) - public void testRunAfterDataInsertedWithLegacyParser() throws Exception - { - recordSupplier.assign(EasyMock.anyObject()); - EasyMock.expectLastCall().anyTimes(); - - EasyMock.expect(recordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes(); - - recordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); - EasyMock.expectLastCall().anyTimes(); - - EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())) - .andReturn(clone(RECORDS, 2, 5)).once(); - - recordSupplier.close(); - EasyMock.expectLastCall().once(); - - replayAll(); - - final KinesisIndexTask task = createTask( - OLD_DATA_SCHEMA, - ImmutableMap.of(SHARD_ID1, "2"), - ImmutableMap.of(SHARD_ID1, "4") - ); - - final ListenableFuture future = runTask(task); - - // Wait for task to exit - Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); - - verifyAll(); - verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSize(RECORDS, 2, 5)) - .totalProcessed(3)); - - // Check published metadata and segments in deep storage - assertEqualsExceptVersion( - ImmutableList.of( - sdd("2010/P1D", 0, ImmutableList.of("c")), - sdd("2011/P1D", 0, ImmutableList.of("d", "e")) - ), - publishedDescriptors() - ); - Assert.assertEquals( - new KinesisDataSourceMetadata( - new SeekableStreamEndSequenceNumbers<>(STREAM, ImmutableMap.of(SHARD_ID1, "4")) - ), - newDataSchemaMetadata() - ); - } - DataSourceMetadata newDataSchemaMetadata() { - return metadataStorageCoordinator.retrieveDataSourceMetadata(NEW_DATA_SCHEMA.getDataSource()); + return metadataStorageCoordinator.retrieveDataSourceMetadata(DATA_SCHEMA.getDataSource()); } @Test(timeout = 120_000L) @@ -620,7 +570,7 @@ public void testIncrementalHandOff() throws Exception Assert.assertTrue( checkpointRequestsHash.contains( Objects.hash( - NEW_DATA_SCHEMA.getDataSource(), + DATA_SCHEMA.getDataSource(), 0, new KinesisDataSourceMetadata(startPartitions) ) @@ -712,7 +662,7 @@ public void testIncrementalHandOffMaxTotalRows() throws Exception Assert.assertTrue( checkpointRequestsHash.contains( Objects.hash( - NEW_DATA_SCHEMA.getDataSource(), + DATA_SCHEMA.getDataSource(), 0, new KinesisDataSourceMetadata( new SeekableStreamStartSequenceNumbers<>(STREAM, startOffsets, Collections.emptySet()) @@ -723,7 +673,7 @@ public void testIncrementalHandOffMaxTotalRows() throws Exception Assert.assertTrue( checkpointRequestsHash.contains( Objects.hash( - NEW_DATA_SCHEMA.getDataSource(), + DATA_SCHEMA.getDataSource(), 0, new KinesisDataSourceMetadata( new SeekableStreamStartSequenceNumbers<>(STREAM, currentOffsets, currentOffsets.keySet())) @@ -907,7 +857,7 @@ public void testRunWithTransformSpec() throws Exception replayAll(); final KinesisIndexTask task = createTask( - NEW_DATA_SCHEMA.withTransformSpec( + DATA_SCHEMA.withTransformSpec( new TransformSpec( new SelectorDimFilter("dim1", "b", null), ImmutableList.of( @@ -1473,7 +1423,7 @@ public void testRunConflictingWithoutTransactions() throws Exception final KinesisIndexTask task1 = createTask( 0, null, - NEW_DATA_SCHEMA, + DATA_SCHEMA, ImmutableMap.of(SHARD_ID1, "2"), ImmutableMap.of(SHARD_ID1, "4"), false @@ -1481,7 +1431,7 @@ public void testRunConflictingWithoutTransactions() throws Exception final KinesisIndexTask task2 = createTask( 1, null, - NEW_DATA_SCHEMA, + DATA_SCHEMA, ImmutableMap.of(SHARD_ID1, "3"), ImmutableMap.of(SHARD_ID1, "9"), false @@ -1953,7 +1903,7 @@ public void testRunContextSequenceAheadOfStartingOffsets() throws Exception final KinesisIndexTask task = createTask( "task1", - NEW_DATA_SCHEMA, + DATA_SCHEMA, new KinesisIndexTaskIOConfig( 0, "sequence0", @@ -2112,7 +2062,7 @@ public void testSequencesFromContext() throws IOException final KinesisIndexTask task = createTask( "task1", - NEW_DATA_SCHEMA, + DATA_SCHEMA, new KinesisIndexTaskIOConfig( 0, "sequence0", @@ -2345,7 +2295,7 @@ private KinesisIndexTask createTask( Map endSequenceNumbers ) throws JsonProcessingException { - return createTask(groupId, taskId, NEW_DATA_SCHEMA, startSequenceNumbers, endSequenceNumbers, true); + return createTask(groupId, taskId, DATA_SCHEMA, startSequenceNumbers, endSequenceNumbers, true); } private KinesisIndexTask createTask( @@ -2360,7 +2310,7 @@ private KinesisIndexTask createTask( private KinesisIndexTask createTask(KinesisIndexTaskIOConfig ioConfig) throws JsonProcessingException { - return createTask(null, NEW_DATA_SCHEMA, ioConfig, null); + return createTask(null, DATA_SCHEMA, ioConfig, null); } private KinesisIndexTask createTask( @@ -2436,7 +2386,7 @@ private KinesisIndexTask createTask( private static DataSchema cloneDataSchema(final DataSchema dataSchema) { - return DataSchema.builder(dataSchema).withObjectMapper(OBJECT_MAPPER).build(); + return DataSchema.builder(dataSchema).build(); } @Override diff --git a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTestUtils.java b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTestUtils.java index 8a76341a237e..ffed3e2d6a5f 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTestUtils.java +++ b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTestUtils.java @@ -67,7 +67,7 @@ public static Task getTask() new IndexTask.IndexIngestionSpec( DataSchema.builder() .withDataSource("foo") - .withTimestamp(new TimestampSpec(null, null, null)) + .withTimestamp(TimestampSpec.DEFAULT) .withDimensions(DimensionsSpec.EMPTY) .withAggregators(new DoubleSumAggregatorFactory("met", "met")) .withGranularity( diff --git a/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcReaderTest.java b/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcReaderTest.java index 3f8d6c38c97e..2c8f3f9907b3 100644 --- a/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcReaderTest.java +++ b/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcReaderTest.java @@ -62,7 +62,7 @@ public class OrcReaderTest extends InitializedNullHandlingTest public void testTest1() throws IOException { final InputEntityReader reader = createReader( - new TimestampSpec("timestamp", "auto", null), + TimestampSpec.DEFAULT, new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("col1", "col2"))), new OrcInputFormat(null, null, new Configuration()), "example/test_1.orc" @@ -90,7 +90,7 @@ public void testTest2() throws IOException new Configuration() ); final InputEntityReader reader = createReader( - new TimestampSpec("timestamp", "auto", null), + TimestampSpec.DEFAULT, new DimensionsSpec(null), inputFormat, "example/test_2.orc" @@ -536,7 +536,7 @@ public void testListMap() throws IOException new Configuration() ); final InputEntityReader reader = createReader( - new TimestampSpec("timestamp", "auto", null), + TimestampSpec.DEFAULT, new DimensionsSpec( ImmutableList.of( AutoTypeColumnSchema.of("a"), @@ -602,7 +602,7 @@ public void testNestedArray() throws IOException new Configuration() ); final InputEntityReader reader = createReader( - new TimestampSpec("timestamp", "auto", null), + TimestampSpec.DEFAULT, new DimensionsSpec( ImmutableList.of( AutoTypeColumnSchema.of("a"), @@ -677,7 +677,7 @@ public void testSimpleNullValues() throws IOException new Configuration() ); final InputEntityReader reader = createReader( - new TimestampSpec("timestamp", "auto", null), + TimestampSpec.DEFAULT, new DimensionsSpec( ImmutableList.of( new StringDimensionSchema("c1"), diff --git a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/FlattenSpecParquetReaderTest.java b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/FlattenSpecParquetReaderTest.java index a3dea3dbd265..5fc57eedee82 100644 --- a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/FlattenSpecParquetReaderTest.java +++ b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/FlattenSpecParquetReaderTest.java @@ -65,7 +65,7 @@ public void testFlat1NoFlattenSpec() throws IOException { final String file = "example/flattening/test_flat_1.parquet"; InputRowSchema schema = new InputRowSchema( - new TimestampSpec("timestamp", "auto", null), + TimestampSpec.DEFAULT, new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2", "dim3", "listDim"))), ColumnsFilter.all() ); @@ -99,7 +99,7 @@ public void testFlat1Autodiscover() throws IOException { final String file = "example/flattening/test_flat_1.parquet"; InputRowSchema schema = new InputRowSchema( - new TimestampSpec("timestamp", "auto", null), + TimestampSpec.DEFAULT, new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of())), ColumnsFilter.all() ); @@ -132,7 +132,7 @@ public void testFlat1Flatten() throws IOException { final String file = "example/flattening/test_flat_1.parquet"; InputRowSchema schema = new InputRowSchema( - new TimestampSpec("timestamp", "auto", null), + TimestampSpec.DEFAULT, new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2", "dim3", "list"))), ColumnsFilter.all() ); @@ -173,7 +173,7 @@ public void testFlat1FlattenSelectListItem() throws IOException { final String file = "example/flattening/test_flat_1.parquet"; InputRowSchema schema = new InputRowSchema( - new TimestampSpec("timestamp", "auto", null), + TimestampSpec.DEFAULT, new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2", "listExtracted"))), ColumnsFilter.all() ); @@ -213,7 +213,7 @@ public void testNested1NoFlattenSpec() throws IOException { final String file = "example/flattening/test_nested_1.parquet"; InputRowSchema schema = new InputRowSchema( - new TimestampSpec("timestamp", "auto", null), + TimestampSpec.DEFAULT, new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1"))), ColumnsFilter.all() ); @@ -249,7 +249,7 @@ public void testNested1Autodiscover() throws IOException { final String file = "example/flattening/test_nested_1.parquet"; InputRowSchema schema = new InputRowSchema( - new TimestampSpec("timestamp", "auto", null), + TimestampSpec.DEFAULT, new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of())), ColumnsFilter.all() ); @@ -282,7 +282,7 @@ public void testNested1Flatten() throws IOException { final String file = "example/flattening/test_nested_1.parquet"; InputRowSchema schema = new InputRowSchema( - new TimestampSpec("timestamp", "auto", null), + TimestampSpec.DEFAULT, new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of())), ColumnsFilter.all() ); @@ -325,7 +325,7 @@ public void testNested1FlattenSelectListItem() throws IOException { final String file = "example/flattening/test_nested_1.parquet"; InputRowSchema schema = new InputRowSchema( - new TimestampSpec("timestamp", "auto", null), + TimestampSpec.DEFAULT, new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of())), ColumnsFilter.all() ); @@ -366,7 +366,7 @@ public void testFlattenNullableListNullableElements() throws IOException { final String file = "example/flattening/nullable_list.snappy.parquet"; InputRowSchema schema = new InputRowSchema( - new TimestampSpec("timestamp", "auto", null), + TimestampSpec.DEFAULT, new DimensionsSpec( ImmutableList.of() ), diff --git a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/NestedColumnParquetReaderTest.java b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/NestedColumnParquetReaderTest.java index 8bb2690e404d..aa28848fc2ed 100644 --- a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/NestedColumnParquetReaderTest.java +++ b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/NestedColumnParquetReaderTest.java @@ -48,7 +48,7 @@ public void testNestedColumnTransformsNestedTestFile() throws IOException { final String file = "example/flattening/test_nested_1.parquet"; InputRowSchema schema = new InputRowSchema( - new TimestampSpec("timestamp", "auto", null), + TimestampSpec.DEFAULT, new DimensionsSpec( ImmutableList.of( AutoTypeColumnSchema.of("nestedData"), @@ -102,7 +102,7 @@ public void testNestedColumnTransformsNestedNullableListFile() throws IOExceptio { final String file = "example/flattening/nullable_list.snappy.parquet"; InputRowSchema schema = new InputRowSchema( - new TimestampSpec("timestamp", "auto", null), + TimestampSpec.DEFAULT, new DimensionsSpec( ImmutableList.of( AutoTypeColumnSchema.of("a1"), @@ -168,7 +168,7 @@ public void testNestedColumnSchemalessNestedTestFileNoNested() throws IOExceptio { final String file = "example/flattening/test_nested_1.parquet"; InputRowSchema schema = new InputRowSchema( - new TimestampSpec("timestamp", "auto", null), + TimestampSpec.DEFAULT, DimensionsSpec.builder().useSchemaDiscovery(false).build(), ColumnsFilter.all(), null @@ -205,7 +205,7 @@ public void testNestedColumnSchemalessNestedTestFile() throws IOException { final String file = "example/flattening/test_nested_1.parquet"; InputRowSchema schema = new InputRowSchema( - new TimestampSpec("timestamp", "auto", null), + TimestampSpec.DEFAULT, DimensionsSpec.builder().useSchemaDiscovery(true).build(), ColumnsFilter.all(), null diff --git a/extensions-core/testing-tools/src/test/java/org/apache/druid/guice/ClusterTestingModuleTest.java b/extensions-core/testing-tools/src/test/java/org/apache/druid/guice/ClusterTestingModuleTest.java index 6cb9d77721cf..63decde23a53 100644 --- a/extensions-core/testing-tools/src/test/java/org/apache/druid/guice/ClusterTestingModuleTest.java +++ b/extensions-core/testing-tools/src/test/java/org/apache/druid/guice/ClusterTestingModuleTest.java @@ -29,6 +29,7 @@ import org.apache.druid.client.coordinator.CoordinatorClientImpl; import org.apache.druid.client.coordinator.NoopCoordinatorClient; import org.apache.druid.data.input.impl.JsonInputFormat; +import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.discovery.NodeRole; import org.apache.druid.indexing.common.SegmentCacheManagerFactory; import org.apache.druid.indexing.common.actions.RemoteTaskActionClientFactory; @@ -204,7 +205,7 @@ public void test_parallelIndexSupervisorTask_withDruidInputSource_hasNoCircularD null, null, new ParallelIndexIngestionSpec( - DataSchema.builder().withDataSource("test").build(), + DataSchema.builder().withDataSource("test").withTimestamp(TimestampSpec.DEFAULT).build(), ioConfig, ParallelIndexTuningConfig.defaultConfig() ), diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index 7e8999d3f9ae..abbbf26400f0 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -27,7 +27,6 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.hash.HashFunction; import com.google.common.hash.Hashing; @@ -36,10 +35,9 @@ import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputSource; import org.apache.druid.data.input.Rows; +import org.apache.druid.error.InvalidInput; import org.apache.druid.hll.HyperLogLogCollector; -import org.apache.druid.indexer.Checks; import org.apache.druid.indexer.IngestionState; -import org.apache.druid.indexer.Property; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.granularity.ArbitraryGranularitySpec; import org.apache.druid.indexer.granularity.GranularitySpec; @@ -1060,9 +1058,7 @@ public IndexIngestionSpec( { super(dataSchema, ioConfig, tuningConfig); - if (dataSchema.getParserMap() != null && ioConfig.getInputSource() != null) { - throw new IAE("Cannot use parser and inputSource together. Try using inputFormat instead of parser."); - } + InvalidInput.notNull(ioConfig.getInputSource(), "inputSource"); IngestionMode ingestionMode = AbstractTask.computeBatchIngestionMode(ioConfig); @@ -1072,13 +1068,8 @@ public IndexIngestionSpec( throw new IAE("GranularitySpec's intervals cannot be empty for replace."); } - if (ioConfig.getInputSource() != null && ioConfig.getInputSource().needsFormat()) { - Checks.checkOneNotNullOrEmpty( - ImmutableList.of( - new Property<>("parser", dataSchema.getParserMap()), - new Property<>("inputFormat", ioConfig.getInputFormat()) - ) - ); + if (ioConfig.getInputSource().needsFormat()) { + InvalidInput.notNull(ioConfig.getInputFormat(), "inputFormat"); } this.dataSchema = dataSchema; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexIngestionSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexIngestionSpec.java index 60a164560d55..0df1a4c73ee6 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexIngestionSpec.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexIngestionSpec.java @@ -21,10 +21,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.collect.ImmutableList; -import org.apache.druid.indexer.Checks; -import org.apache.druid.indexer.Property; -import org.apache.druid.java.util.common.IAE; +import org.apache.druid.error.InvalidInput; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.IngestionSpec; @@ -43,16 +40,9 @@ public ParallelIndexIngestionSpec( { super(dataSchema, ioConfig, tuningConfig); - if (dataSchema.getParserMap() != null && ioConfig.getInputSource() != null) { - throw new IAE("Cannot use parser and inputSource together. Try using inputFormat instead of parser."); - } - if (ioConfig.getInputSource() != null && ioConfig.getInputSource().needsFormat()) { - Checks.checkOneNotNullOrEmpty( - ImmutableList.of( - new Property<>("parser", dataSchema.getParserMap()), - new Property<>("inputFormat", ioConfig.getInputFormat()) - ) - ); + InvalidInput.notNull(ioConfig.getInputSource(), "inputSource"); + if (ioConfig.getInputSource().needsFormat()) { + InvalidInput.notNull(ioConfig.getInputFormat(), "inputFormat"); } this.dataSchema = dataSchema; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java index 5541fb1374b9..38a79684dcbd 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java @@ -71,7 +71,7 @@ public class InputSourceSampler private static final DataSchema DEFAULT_DATA_SCHEMA = DataSchema.builder() .withDataSource(SAMPLER_DATA_SOURCE) - .withTimestamp(new TimestampSpec(null, null, null)) + .withTimestamp(TimestampSpec.DEFAULT) .withDimensions(DimensionsSpec.builder().build()) .build(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkReader.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkReader.java index fb24323baad8..759c92b1c4b8 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkReader.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkReader.java @@ -65,7 +65,7 @@ class StreamChunkReader ParseExceptionHandler parseExceptionHandler ) { - InvalidInput.conditionalException(inputFormat != null, "inputFormat must not be null"); + InvalidInput.notNull(inputFormat, "inputFormat"); this.byteEntityReader = new SettableByteEntityReader<>( inputFormat, inputRowSchema, @@ -85,8 +85,7 @@ class StreamChunkReader ParseExceptionHandler parseExceptionHandler ) { - InvalidInput.conditionalException(byteEntityReader != null, "byteEntityReader must not be null"); - this.byteEntityReader = byteEntityReader; + this.byteEntityReader = InvalidInput.notNull(byteEntityReader, "byteEntityReader"); this.rowFilter = rowFilter; this.rowIngestionMeters = rowIngestionMeters; this.parseExceptionHandler = parseExceptionHandler; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/TestIndexTask.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/TestIndexTask.java index 69537c8627cb..db4f18096625 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/TestIndexTask.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/TestIndexTask.java @@ -26,6 +26,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.druid.data.input.impl.JsonInputFormat; import org.apache.druid.data.input.impl.LocalInputSource; +import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.indexing.common.actions.TaskAction; @@ -61,7 +62,7 @@ public TestIndexTask( id, taskResource, new IndexIngestionSpec( - DataSchema.builder().withDataSource(dataSource).withObjectMapper(mapper).build(), + DataSchema.builder().withDataSource(dataSource).withTimestamp(TimestampSpec.DEFAULT).build(), new IndexTask.IndexIOConfig( new LocalInputSource(new File("lol"), "rofl"), new JsonInputFormat(null, null, null, null, null), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexIngestionSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexIngestionSpecTest.java deleted file mode 100644 index 8076fc926156..000000000000 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexIngestionSpecTest.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.indexing.common.task; - -import com.google.common.collect.ImmutableMap; -import org.apache.druid.data.input.impl.NoopInputFormat; -import org.apache.druid.data.input.impl.NoopInputSource; -import org.apache.druid.indexer.granularity.ArbitraryGranularitySpec; -import org.apache.druid.indexing.common.task.IndexTask.IndexIOConfig; -import org.apache.druid.indexing.common.task.IndexTask.IndexIngestionSpec; -import org.apache.druid.java.util.common.granularity.Granularities; -import org.apache.druid.segment.indexing.DataSchema; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; - -public class IndexIngestionSpecTest -{ - @Rule - public ExpectedException expectedException = ExpectedException.none(); - - @Test - public void testParserAndInputFormat() - { - expectedException.expect(IllegalArgumentException.class); - expectedException.expectMessage( - "Cannot use parser and inputSource together. Try using inputFormat instead of parser." - ); - final IndexIngestionSpec spec = new IndexIngestionSpec( - DataSchema.builder() - .withDataSource("dataSource") - .withParserMap(ImmutableMap.of("fake", "parser map")) - .withGranularity(new ArbitraryGranularitySpec(Granularities.NONE, null)) - .build(), - new IndexIOConfig( - new NoopInputSource(), - new NoopInputFormat(), - null, - null - ), - null - ); - } - - @Test - public void testParserAndInputSource() - { - expectedException.expect(IllegalArgumentException.class); - expectedException.expectMessage("Cannot use parser and inputSource together."); - final IndexIngestionSpec spec = new IndexIngestionSpec( - DataSchema.builder() - .withDataSource("dataSource") - .withParserMap(ImmutableMap.of("fake", "parser map")) - .withGranularity(new ArbitraryGranularitySpec(Granularities.NONE, null)) - .build(), - new IndexIOConfig( - new NoopInputSource(), - null, - null, - null - ), - null - ); - } -} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java index 9a2eeb8c1a66..251ca46601c4 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java @@ -2720,7 +2720,6 @@ private static IndexIngestionSpec createIngestionSpec( ) ) .withTransform(transformSpec) - .withObjectMapper(objectMapper) .build(), new IndexIOConfig( new LocalInputSource(baseDir, "druid*"), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java index 5064d915c5a9..99e12bf2c304 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java @@ -218,7 +218,7 @@ public void testIndexTaskSerde() throws Exception new IndexIngestionSpec( DataSchema.builder() .withDataSource("foo") - .withTimestamp(new TimestampSpec(null, null, null)) + .withTimestamp(TimestampSpec.DEFAULT) .withDimensions(DimensionsSpec.EMPTY) .withAggregators(new DoubleSumAggregatorFactory("met", "met")) .withGranularity( @@ -287,7 +287,7 @@ public void testIndexTaskwithResourceSerde() throws Exception new IndexIngestionSpec( DataSchema.builder() .withDataSource("foo") - .withTimestamp(new TimestampSpec(null, null, null)) + .withTimestamp(TimestampSpec.DEFAULT) .withDimensions(DimensionsSpec.EMPTY) .withAggregators(new DoubleSumAggregatorFactory("met", "met")) .withGranularity( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java index 6d2b7eded2a5..4652ecc5444c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java @@ -27,9 +27,7 @@ import org.apache.druid.data.input.InputSource; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.InlineInputSource; -import org.apache.druid.data.input.impl.JSONParseSpec; import org.apache.druid.data.input.impl.JsonInputFormat; -import org.apache.druid.data.input.impl.StringInputRowParser; import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.indexer.partitions.HashedPartitionsSpec; import org.apache.druid.indexer.partitions.PartitionsSpec; @@ -38,8 +36,6 @@ import org.apache.druid.indexer.report.TaskReport; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.task.TuningConfigBuilder; -import org.apache.druid.jackson.DefaultObjectMapper; -import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.http.client.response.StringFullResponseHolder; import org.apache.druid.rpc.HttpResponseException; @@ -265,7 +261,7 @@ public void testFailToConstructWhenBothAppendToExistingAndForceGuaranteedRollupA final ParallelIndexIngestionSpec indexIngestionSpec = new ParallelIndexIngestionSpec( DataSchema.builder() .withDataSource("datasource") - .withTimestamp(new TimestampSpec(null, null, null)) + .withTimestamp(TimestampSpec.DEFAULT) .withDimensions(DimensionsSpec.EMPTY) .build(), ioConfig, @@ -281,69 +277,6 @@ public void testFailToConstructWhenBothAppendToExistingAndForceGuaranteedRollupA null ); } - - @Test - public void testFailToConstructWhenBothInputSourceAndParserAreSet() - { - final ObjectMapper mapper = new DefaultObjectMapper(); - final ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig( - new InlineInputSource("test"), - null, - false, - null - ); - final ParallelIndexTuningConfig tuningConfig = TuningConfigBuilder - .forParallelIndexTask() - .withMaxRowsInMemory(10) - .withMaxBytesInMemory(1000L) - .withPartitionsSpec(new HashedPartitionsSpec(null, 10, null)) - .withIndexSpec( - IndexSpec.builder() - .withBitmapSerdeFactory(RoaringBitmapSerdeFactory.getInstance()) - .withDimensionCompression(CompressionStrategy.UNCOMPRESSED) - .withMetricCompression(CompressionStrategy.LZF) - .withLongEncoding(LongEncodingStrategy.LONGS) - .build() - ) - .withIndexSpecForIntermediatePersists(IndexSpec.getDefault()) - .withMaxPendingPersists(1) - .withForceGuaranteedRollup(true) - .withReportParseExceptions(true) - .withPushTimeout(10000L) - .withSegmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) - .withMaxNumConcurrentSubTasks(10) - .withMaxRetry(100) - .withTaskStatusCheckPeriodMs(20L) - .withChatHandlerTimeout(new Duration(3600)) - .withChatHandlerNumRetries(128) - .withLogParseExceptions(false) - .build(); - - expectedException.expect(IAE.class); - expectedException.expectMessage("Cannot use parser and inputSource together. Try using inputFormat instead of parser."); - new ParallelIndexIngestionSpec( - DataSchema.builder() - .withDataSource("datasource") - .withParserMap( - mapper.convertValue( - new StringInputRowParser( - new JSONParseSpec( - new TimestampSpec(null, null, null), - DimensionsSpec.EMPTY, - null, - null, - null - ) - ), - Map.class - ) - ) - .withObjectMapper(mapper) - .build(), - ioConfig, - tuningConfig - ); - } } public static class StaticUtilsTest @@ -557,7 +490,7 @@ public void testCompactionTaskDoesntCleanup() throws Exception final ParallelIndexIngestionSpec indexIngestionSpec = new ParallelIndexIngestionSpec( DataSchema.builder() .withDataSource("datasource") - .withTimestamp(new TimestampSpec(null, null, null)) + .withTimestamp(TimestampSpec.DEFAULT) .withDimensions(DimensionsSpec.EMPTY) .build(), ioConfig, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java index 091e7b2d7cef..481314896b1e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java @@ -100,7 +100,6 @@ static DataSchema createDataSchema(List granularitySpecInputIntervals) .withTimestamp(timestampSpec) .withDimensions(dimensionsSpec) .withGranularity(granularitySpec) - .withObjectMapper(NESTED_OBJECT_MAPPER) .build(); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTaskSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTaskSpecTest.java index 577dce1255d0..1856e5d81adf 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTaskSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTaskSpecTest.java @@ -49,7 +49,7 @@ public class SinglePhaseSubTaskSpecTest new ParallelIndexIngestionSpec( DataSchema.builder() .withDataSource("dataSource") - .withTimestamp(new TimestampSpec(null, null, null)) + .withTimestamp(TimestampSpec.DEFAULT) .withDimensions(DimensionsSpec.builder().build()) .build(), new ParallelIndexIOConfig( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelperTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelperTest.java index 9898dda0b3bd..084cbdef1502 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelperTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelperTest.java @@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.indexer.granularity.GranularitySpec; import org.apache.druid.indexer.granularity.UniformGranularitySpec; import org.apache.druid.indexing.common.actions.LockListAction; @@ -67,7 +68,11 @@ public void noTombstonesWhenNoDataInInputIntervalAndNoExistingSegments() throws GranularitySpec granularitySpec = new UniformGranularitySpec(Granularities.DAY, null, false, Collections.singletonList(interval) ); - DataSchema dataSchema = DataSchema.builder().withDataSource("test").withGranularity(granularitySpec).build(); + DataSchema dataSchema = DataSchema.builder() + .withDataSource("test") + .withTimestamp(TimestampSpec.DEFAULT) + .withGranularity(granularitySpec) + .build(); // no segments will be pushed when all rows are thrown away, assume that: List pushedSegments = Collections.emptyList(); @@ -92,7 +97,11 @@ public void tombstonesCreatedWhenNoDataInInputIntervalAndExistingSegments() thro GranularitySpec granularitySpec = new UniformGranularitySpec(Granularities.DAY, null, false, Collections.singletonList(interval) ); - DataSchema dataSchema = DataSchema.builder().withDataSource("test").withGranularity(granularitySpec).build(); + DataSchema dataSchema = DataSchema.builder() + .withDataSource("test") + .withTimestamp(TimestampSpec.DEFAULT) + .withGranularity(granularitySpec) + .build(); // no segments will be pushed when all rows are thrown away, assume that: List pushedSegments = Collections.emptyList(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidInputSourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidInputSourceTest.java index 4fc77934f60e..5e4ecd657cd2 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidInputSourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidInputSourceTest.java @@ -235,7 +235,7 @@ public void testReaderColumnsFilterWithMetricGiven() String metricName = "m1"; ColumnsFilter originalColumnsFilter = ColumnsFilter.inclusionBased(ImmutableSet.of(column)); InputRowSchema inputRowSchema = new InputRowSchema( - new TimestampSpec("timestamp", "auto", null), + TimestampSpec.DEFAULT, new DimensionsSpec( DimensionsSpec.getDefaultSchemas(Arrays.asList("timestamp", "a", "b")) ), @@ -269,7 +269,7 @@ public void testReaderColumnsFilterWithNoMetricGiven() String metricName = "m1"; ColumnsFilter originalColumnsFilter = ColumnsFilter.inclusionBased(ImmutableSet.of(column)); InputRowSchema inputRowSchema = new InputRowSchema( - new TimestampSpec("timestamp", "auto", null), + TimestampSpec.DEFAULT, new DimensionsSpec( DimensionsSpec.getDefaultSchemas(Arrays.asList("timestamp", "a", "b")) ), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/input/GeneratorInputSourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/input/GeneratorInputSourceTest.java index 94aecc2f443c..47052deb3d6e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/input/GeneratorInputSourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/input/GeneratorInputSourceTest.java @@ -133,7 +133,7 @@ public void testReader() throws IOException ); InputRowSchema rowSchema = new InputRowSchema( - new TimestampSpec(null, null, null), + TimestampSpec.DEFAULT, DimensionsSpec.builder().useSchemaDiscovery(true).build(), null ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/input/InputRowSchemasTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/input/InputRowSchemasTest.java index 5815f043c51d..e605b66cf960 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/input/InputRowSchemasTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/input/InputRowSchemasTest.java @@ -112,7 +112,7 @@ public void test_createColumnsFilter_schemaless() @Test public void testFromDataSchema() { - TimestampSpec timestampSpec = new TimestampSpec(null, null, null); + TimestampSpec timestampSpec = TimestampSpec.DEFAULT; DimensionsSpec dimensionsSpec = new DimensionsSpec( Arrays.asList( new StringDimensionSchema("d1"), @@ -125,7 +125,7 @@ public void testFromDataSchema() DataSchema schema = DataSchema.builder() .withDataSource("dataSourceName") - .withTimestamp(new TimestampSpec(null, null, null)) + .withTimestamp(TimestampSpec.DEFAULT) .withDimensions(dimensionsSpec) .withAggregators( new CountAggregatorFactory("count"), @@ -146,7 +146,7 @@ public void testFromDataSchema() @Test public void testFromDataSchemaWithNoAggregator() { - TimestampSpec timestampSpec = new TimestampSpec(null, null, null); + TimestampSpec timestampSpec = TimestampSpec.DEFAULT; DimensionsSpec dimensionsSpec = new DimensionsSpec( Arrays.asList( new StringDimensionSchema("d1"), @@ -158,7 +158,7 @@ public void testFromDataSchemaWithNoAggregator() ); DataSchema schema = DataSchema.builder() .withDataSource("dataSourceName") - .withTimestamp(new TimestampSpec(null, null, null)) + .withTimestamp(TimestampSpec.DEFAULT) .withDimensions(dimensionsSpec) .withGranularity(new UniformGranularitySpec(Granularities.MINUTE, Granularities.NONE, null)) .build(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java index 9b54725293a7..103d57e13437 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java @@ -612,7 +612,7 @@ public void testTaskCommandIncludesServerPriorityIfConfigured() throws Exception null, DataSchema.builder() .withDataSource("foo") - .withTimestamp(new TimestampSpec(null, null, null)) + .withTimestamp(TimestampSpec.DEFAULT) .withDimensions(new DimensionsSpec(List.of())) .withGranularity(new ArbitraryGranularitySpec(new AllGranularity(), List.of())) .build(), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java index 3a01e9f43cad..3d086c0cc977 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java @@ -669,7 +669,7 @@ public void testIndexTask() new IndexIngestionSpec( DataSchema.builder() .withDataSource("foo") - .withTimestamp(new TimestampSpec(null, null, null)) + .withTimestamp(TimestampSpec.DEFAULT) .withDimensions(DimensionsSpec.EMPTY) .withAggregators(new DoubleSumAggregatorFactory("met", "met")) .withGranularity( @@ -733,6 +733,7 @@ public void testIndexTaskFailure() new IndexIngestionSpec( DataSchema.builder() .withDataSource("foo") + .withTimestamp(TimestampSpec.DEFAULT) .withAggregators(new DoubleSumAggregatorFactory("met", "met")) .withGranularity( new UniformGranularitySpec( @@ -741,7 +742,6 @@ public void testIndexTaskFailure() ImmutableList.of(Intervals.of("2010-01-01/P1D")) ) ) - .withObjectMapper(mapper) .build(), new IndexIOConfig(new MockExceptionInputSource(), new NoopInputFormat(), false, false), TuningConfigBuilder.forIndexTask() @@ -1165,7 +1165,7 @@ public void testResumeTasks() throws Exception new IndexIngestionSpec( DataSchema.builder() .withDataSource("foo") - .withTimestamp(new TimestampSpec(null, null, null)) + .withTimestamp(TimestampSpec.DEFAULT) .withDimensions(DimensionsSpec.EMPTY) .withAggregators(new DoubleSumAggregatorFactory("met", "met")) .withGranularity( @@ -1254,7 +1254,7 @@ public void testUnifiedAppenderatorsManagerCleanup() throws Exception new IndexIngestionSpec( DataSchema.builder() .withDataSource("foo") - .withTimestamp(new TimestampSpec(null, null, null)) + .withTimestamp(TimestampSpec.DEFAULT) .withDimensions(DimensionsSpec.EMPTY) .withAggregators(new DoubleSumAggregatorFactory("met", "met")) .withGranularity( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java index 388af3bfe910..33f80da67e03 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java @@ -592,7 +592,7 @@ public void testGetActiveTaskRedactsPassword() throws JsonProcessingException final DataSchema dataSchema = DataSchema.builder() .withDataSource("DS") - .withTimestamp(new TimestampSpec(null, null, null)) + .withTimestamp(TimestampSpec.DEFAULT) .withDimensions(DimensionsSpec.builder().build()) .withGranularity( new UniformGranularitySpec(Granularities.YEAR, Granularities.DAY, null) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithCategorySpecWorkerSelectStrategyTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithCategorySpecWorkerSelectStrategyTest.java index 7ac8b2b746ab..3c22a6c4c6d1 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithCategorySpecWorkerSelectStrategyTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithCategorySpecWorkerSelectStrategyTest.java @@ -306,7 +306,7 @@ private static Task createTestTask(String id, @Nullable String supervisorId, Str null, DataSchema.builder() .withDataSource(datasource) - .withTimestamp(new TimestampSpec(null, null, null)) + .withTimestamp(TimestampSpec.DEFAULT) .withDimensions(new DimensionsSpec(Collections.emptyList())) .withGranularity(new ArbitraryGranularitySpec(new AllGranularity(), Collections.emptyList())) .build(), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithCategorySpecWorkerSelectStrategyTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithCategorySpecWorkerSelectStrategyTest.java index ea4ffb16af5b..27fde352caba 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithCategorySpecWorkerSelectStrategyTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithCategorySpecWorkerSelectStrategyTest.java @@ -311,7 +311,7 @@ private static Task createTestTask(String id, @Nullable String supervisorId, Str null, DataSchema.builder() .withDataSource(datasource) - .withTimestamp(new TimestampSpec(null, null, null)) + .withTimestamp(TimestampSpec.DEFAULT) .withDimensions(new DimensionsSpec(Collections.emptyList())) .withGranularity(new ArbitraryGranularitySpec(new AllGranularity(), Collections.emptyList())) .build(), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerAuthTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerAuthTest.java index e63d3dbd6e5c..cbc1ca7c7a61 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerAuthTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerAuthTest.java @@ -116,7 +116,7 @@ public Authorizer getAuthorizer(String name) DataSchema dataSchema = DataSchema.builder() .withDataSource("datasource") - .withTimestamp(new TimestampSpec(null, null, null)) + .withTimestamp(TimestampSpec.DEFAULT) .withDimensions(new DimensionsSpec(Collections.emptyList())) .withGranularity(new ArbitraryGranularitySpec(new AllGranularity(), Collections.emptyList())) .build(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java index 7ceae8451703..29dcc9ea37b3 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java @@ -73,7 +73,7 @@ public void testWithinMinMaxTime() DataSchema schema = DataSchema.builder() .withDataSource("datasource") - .withTimestamp(new TimestampSpec(null, null, null)) + .withTimestamp(TimestampSpec.DEFAULT) .withDimensions(dimensionsSpec) .withGranularity( new UniformGranularitySpec(Granularities.MINUTE, Granularities.NONE, null) @@ -128,7 +128,7 @@ public void testWithinMinMaxTimeNotPopulated() DataSchema schema = DataSchema.builder() .withDataSource("datasource") - .withTimestamp(new TimestampSpec(null, null, null)) + .withTimestamp(TimestampSpec.DEFAULT) .withDimensions(dimensionsSpec) .withGranularity( new UniformGranularitySpec(Granularities.MINUTE, Granularities.NONE, null) @@ -183,7 +183,7 @@ public void testEnsureRowRejectionReasonForNullRow() DataSchema schema = DataSchema.builder() .withDataSource("datasource") - .withTimestamp(new TimestampSpec(null, null, null)) + .withTimestamp(TimestampSpec.DEFAULT) .withDimensions(dimensionsSpec) .withGranularity( new UniformGranularitySpec(Granularities.MINUTE, Granularities.NONE, null) @@ -229,7 +229,7 @@ public void testGetSupervisorId() DataSchema schema = DataSchema.builder() .withDataSource("datasource") - .withTimestamp(new TimestampSpec(null, null, null)) + .withTimestamp(TimestampSpec.DEFAULT) .withDimensions(dimensionsSpec) .withGranularity( new UniformGranularitySpec(Granularities.MINUTE, Granularities.NONE, null) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java index 9d83ac24d2b8..0970c06cbf14 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java @@ -21,7 +21,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; -import com.fasterxml.jackson.databind.jsontype.NamedType; import com.google.common.base.Predicates; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; @@ -36,13 +35,10 @@ import org.apache.druid.client.coordinator.NoopCoordinatorClient; import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.impl.ByteEntity; -import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.FloatDimensionSchema; -import org.apache.druid.data.input.impl.JSONParseSpec; import org.apache.druid.data.input.impl.JsonInputFormat; import org.apache.druid.data.input.impl.LongDimensionSchema; import org.apache.druid.data.input.impl.StringDimensionSchema; -import org.apache.druid.data.input.impl.StringInputRowParser; import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.discovery.DataNodeService; import org.apache.druid.discovery.DruidNodeAnnouncer; @@ -149,7 +145,6 @@ import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -157,7 +152,6 @@ import java.util.Comparator; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.Executor; @@ -174,8 +168,7 @@ public abstract class SeekableStreamIndexTaskTestBase extends EasyMockSupport public final TestDerbyConnector.DerbyConnectorRule derby = new TestDerbyConnector.DerbyConnectorRule(); protected static final ObjectMapper OBJECT_MAPPER; - protected static final DataSchema OLD_DATA_SCHEMA; - protected static final DataSchema NEW_DATA_SCHEMA = + protected static final DataSchema DATA_SCHEMA = DataSchema.builder() .withDataSource("test_ds") .withTimestamp(new TimestampSpec("timestamp", "iso", null)) @@ -217,39 +210,6 @@ public abstract class SeekableStreamIndexTaskTestBase extends EasyMockSupport static { OBJECT_MAPPER = new TestUtils().getTestObjectMapper(); - OBJECT_MAPPER.registerSubtypes(new NamedType(JSONParseSpec.class, "json")); - OLD_DATA_SCHEMA = DataSchema.builder() - .withDataSource("test_ds") - .withParserMap( - OBJECT_MAPPER.convertValue( - new StringInputRowParser( - new JSONParseSpec( - new TimestampSpec("timestamp", "iso", null), - new DimensionsSpec( - Arrays.asList( - new StringDimensionSchema("dim1"), - new StringDimensionSchema("dim1t"), - new StringDimensionSchema("dim2"), - new LongDimensionSchema("dimLong"), - new FloatDimensionSchema("dimFloat") - ) - ), - new JSONPathSpec(true, ImmutableList.of()), - ImmutableMap.of(), - false - ), - StandardCharsets.UTF_8.name() - ), - Map.class - ) - ) - .withAggregators( - new DoubleSumAggregatorFactory("met1sum", "met1"), - new CountAggregatorFactory("rows") - ) - .withGranularity(new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null)) - .withObjectMapper(OBJECT_MAPPER) - .build(); } public SeekableStreamIndexTaskTestBase( @@ -268,7 +228,7 @@ public void setupBase() } @After - public void tearDownBase() throws IOException + public void tearDownBase() { emitter.close(); } @@ -383,7 +343,7 @@ protected List readSegmentColumn(final String column, final SegmentDescr StringUtils.format( "%s/%s/%s_%s/%s/%d", getSegmentDirectory(), - OLD_DATA_SCHEMA.getDataSource(), + DATA_SCHEMA.getDataSource(), descriptor.getInterval().getStart(), descriptor.getInterval().getEnd(), descriptor.getVersion(), @@ -538,7 +498,7 @@ protected long countEvents(final Task task) { // Do a query. TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() - .dataSource(OLD_DATA_SCHEMA.getDataSource()) + .dataSource(DATA_SCHEMA.getDataSource()) .aggregators( ImmutableList.of( new LongSumAggregatorFactory("rows", "rows") @@ -565,7 +525,7 @@ protected void unlockAppenderatorBasePersistDirForTask(SeekableStreamIndexTask t protected Collection publishedSegments() { return metadataStorageCoordinator - .retrieveAllUsedSegments(OLD_DATA_SCHEMA.getDataSource(), Segments.ONLY_VISIBLE); + .retrieveAllUsedSegments(DATA_SCHEMA.getDataSource(), Segments.ONLY_VISIBLE); } protected List publishedDescriptors() diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkReaderTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkReaderTest.java index f3bc46a75247..7e414ad506b9 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkReaderTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkReaderTest.java @@ -62,8 +62,6 @@ @RunWith(MockitoJUnitRunner.class) public class StreamChunkReaderTest { - private static final TimestampSpec TIMESTAMP_SPEC = new TimestampSpec(null, null, null); - @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); @@ -87,7 +85,7 @@ public void testInputformatParseProperly() throws IOException final JsonInputFormat inputFormat = new JsonInputFormat(JSONPathSpec.DEFAULT, Collections.emptyMap(), null, null, null); final StreamChunkReader chunkParser = new StreamChunkReader<>( inputFormat, - new InputRowSchema(TIMESTAMP_SPEC, DimensionsSpec.EMPTY, ColumnsFilter.all()), + new InputRowSchema(TimestampSpec.DEFAULT, DimensionsSpec.EMPTY, ColumnsFilter.all()), TransformSpec.NONE, temporaryFolder.newFolder(), InputRowFilter.allowAll(), @@ -125,7 +123,7 @@ public void parseEmptyNotEndOfShard() throws IOException ); final StreamChunkReader chunkParser = new StreamChunkReader<>( inputFormat, - new InputRowSchema(TIMESTAMP_SPEC, DimensionsSpec.EMPTY, ColumnsFilter.all()), + new InputRowSchema(TimestampSpec.DEFAULT, DimensionsSpec.EMPTY, ColumnsFilter.all()), TransformSpec.NONE, temporaryFolder.newFolder(), InputRowFilter.allowAll(), @@ -147,7 +145,7 @@ public void parseEmptyEndOfShard() throws IOException ); final StreamChunkReader chunkParser = new StreamChunkReader<>( inputFormat, - new InputRowSchema(TIMESTAMP_SPEC, DimensionsSpec.EMPTY, ColumnsFilter.all()), + new InputRowSchema(TimestampSpec.DEFAULT, DimensionsSpec.EMPTY, ColumnsFilter.all()), TransformSpec.NONE, temporaryFolder.newFolder(), InputRowFilter.allowAll(), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/TaskAnnouncementTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/TaskAnnouncementTest.java index 61396fc7ae61..09cd8e6c6e5b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/TaskAnnouncementTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/TaskAnnouncementTest.java @@ -21,13 +21,13 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.druid.data.input.impl.NoopInputSource; +import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.TestUtils; import org.apache.druid.indexing.common.task.IndexTask; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.TaskResource; -import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.segment.indexing.DataSchema; import org.junit.Assert; import org.junit.Test; @@ -50,7 +50,7 @@ public void testBackwardsCompatibleSerde() throws Exception "theid", new TaskResource("rofl", 2), new IndexTask.IndexIngestionSpec( - DataSchema.builder().withDataSource("foo").withObjectMapper(new DefaultObjectMapper()).build(), + DataSchema.builder().withDataSource("foo").withTimestamp(TimestampSpec.DEFAULT).build(), ioConfig, null ), diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/TimeAndDimsParseSpec.java b/processing/src/main/java/org/apache/druid/data/input/impl/TimeAndDimsParseSpec.java index 823e0920b7ff..0d894f30cb2c 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/TimeAndDimsParseSpec.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/TimeAndDimsParseSpec.java @@ -37,7 +37,7 @@ public TimeAndDimsParseSpec( ) { super( - timestampSpec != null ? timestampSpec : new TimestampSpec(null, null, null), + timestampSpec != null ? timestampSpec : TimestampSpec.DEFAULT, dimensionsSpec != null ? dimensionsSpec : DimensionsSpec.EMPTY ); } diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/TimestampSpec.java b/processing/src/main/java/org/apache/druid/data/input/impl/TimestampSpec.java index cdd6ebd4737a..9ec29c9811a4 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/TimestampSpec.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/TimestampSpec.java @@ -36,15 +36,13 @@ @PublicApi public class TimestampSpec { - private static class ParseCtx - { - Object lastTimeObject = null; - DateTime lastDateTime = null; - } - - public static final String DEFAULT_COLUMN = "timestamp"; private static final String DEFAULT_FORMAT = "auto"; private static final DateTime DEFAULT_MISSING_VALUE = null; + // remember last value parsed + private static final ThreadLocal PARSE_CTX = ThreadLocal.withInitial(ParseCtx::new); + + public static final String DEFAULT_COLUMN = "timestamp"; + public static final TimestampSpec DEFAULT = new TimestampSpec(null, null, null); private final String timestampColumn; private final String timestampFormat; @@ -53,9 +51,6 @@ private static class ParseCtx /** This field is a derivative of {@link #timestampFormat}; not checked in {@link #equals} and {@link #hashCode} */ private final Function timestampConverter; - // remember last value parsed - private static final ThreadLocal PARSE_CTX = ThreadLocal.withInitial(ParseCtx::new); - @JsonCreator public TimestampSpec( @JsonProperty("column") @Nullable String timestampColumn, @@ -67,9 +62,7 @@ public TimestampSpec( this.timestampColumn = (timestampColumn == null) ? DEFAULT_COLUMN : timestampColumn; this.timestampFormat = format == null ? DEFAULT_FORMAT : format; this.timestampConverter = TimestampParser.createObjectTimestampParser(timestampFormat); - this.missingValue = missingValue == null - ? DEFAULT_MISSING_VALUE - : missingValue; + this.missingValue = missingValue == null ? DEFAULT_MISSING_VALUE : missingValue; } @JsonProperty("column") @@ -183,4 +176,10 @@ public static TimestampSpec mergeTimestampSpec(List toMerge) return result; } + + private static class ParseCtx + { + Object lastTimeObject = null; + DateTime lastDateTime = null; + } } diff --git a/processing/src/main/java/org/apache/druid/error/InvalidInput.java b/processing/src/main/java/org/apache/druid/error/InvalidInput.java index 4d9bc2a83fc6..0ee2262b1e2f 100644 --- a/processing/src/main/java/org/apache/druid/error/InvalidInput.java +++ b/processing/src/main/java/org/apache/druid/error/InvalidInput.java @@ -19,6 +19,8 @@ package org.apache.druid.error; +import javax.annotation.Nullable; + /** * A failure type used to make {@link DruidException}s of category * {@link DruidException.Category#INVALID_INPUT} for persona {@link DruidException.Persona#USER}. @@ -49,6 +51,12 @@ public static void conditionalException(boolean condition, String msg, Object... } } + public static T notNull(@Nullable T val, String name) + { + InvalidInput.conditionalException(val != null, "%s must not be null", name); + return val; + } + /** * evalues a condition. If it's false, it throws the appropriate DruidException with a given cause * diff --git a/processing/src/main/java/org/apache/druid/query/filter/FilterSegmentPruner.java b/processing/src/main/java/org/apache/druid/query/filter/FilterSegmentPruner.java index 6dce9018b7a6..cb0b88ea0790 100644 --- a/processing/src/main/java/org/apache/druid/query/filter/FilterSegmentPruner.java +++ b/processing/src/main/java/org/apache/druid/query/filter/FilterSegmentPruner.java @@ -51,8 +51,7 @@ public FilterSegmentPruner( @Nullable Set filterFields ) { - InvalidInput.conditionalException(filter != null, "filter must not be null"); - this.filter = filter; + this.filter = InvalidInput.notNull(filter, "filter"); this.filterFields = filterFields == null ? filter.getRequiredColumns() : filterFields; this.rangeCache = new HashMap<>(); } diff --git a/processing/src/test/java/org/apache/druid/data/input/impl/InputEntityIteratingReaderTest.java b/processing/src/test/java/org/apache/druid/data/input/impl/InputEntityIteratingReaderTest.java index ad1422126dde..0dd87ad5b64c 100644 --- a/processing/src/test/java/org/apache/druid/data/input/impl/InputEntityIteratingReaderTest.java +++ b/processing/src/test/java/org/apache/druid/data/input/impl/InputEntityIteratingReaderTest.java @@ -191,7 +191,7 @@ public void testIncorrectURI() throws IOException, URISyntaxException { final InputEntityIteratingReader inputReader = new InputEntityIteratingReader( new InputRowSchema( - new TimestampSpec(null, null, null), + TimestampSpec.DEFAULT, new DimensionsSpec( DimensionsSpec.getDefaultSchemas(ImmutableList.of("time", "name", "score")) ), diff --git a/processing/src/test/java/org/apache/druid/data/input/impl/StringInputRowParserTest.java b/processing/src/test/java/org/apache/druid/data/input/impl/StringInputRowParserTest.java index e7230bb57491..4dca4583665b 100644 --- a/processing/src/test/java/org/apache/druid/data/input/impl/StringInputRowParserTest.java +++ b/processing/src/test/java/org/apache/druid/data/input/impl/StringInputRowParserTest.java @@ -35,7 +35,7 @@ public class StringInputRowParserTest public void testDisableJavaScript() { final JavaScriptParseSpec parseSpec = new JavaScriptParseSpec( - new TimestampSpec("timestamp", "auto", null), + TimestampSpec.DEFAULT, new DimensionsSpec( DimensionsSpec.getDefaultSchemas( ImmutableList.of( @@ -59,7 +59,7 @@ public void testDisableJavaScript() public void testDisableJavaScript2() { final JavaScriptParseSpec parseSpec = new JavaScriptParseSpec( - new TimestampSpec("timestamp", "auto", null), + TimestampSpec.DEFAULT, new DimensionsSpec( DimensionsSpec.getDefaultSchemas( ImmutableList.of( diff --git a/processing/src/test/java/org/apache/druid/query/NestedDataTestUtils.java b/processing/src/test/java/org/apache/druid/query/NestedDataTestUtils.java index fad0252d750a..1c328f6229bf 100644 --- a/processing/src/test/java/org/apache/druid/query/NestedDataTestUtils.java +++ b/processing/src/test/java/org/apache/druid/query/NestedDataTestUtils.java @@ -87,15 +87,13 @@ public class NestedDataTestUtils public static final ObjectMapper JSON_MAPPER; - public static final TimestampSpec TIMESTAMP_SPEC = new TimestampSpec("timestamp", null, null); - public static final DimensionsSpec AUTO_DISCOVERY = DimensionsSpec.builder() .useSchemaDiscovery(true) .build(); public static final InputRowSchema AUTO_SCHEMA = new InputRowSchema( - TIMESTAMP_SPEC, + TimestampSpec.DEFAULT, AUTO_DISCOVERY, null ); @@ -174,7 +172,7 @@ public static class ResourceFileSegmentBuilder private List inputSources = List.of(ResourceInputSource.of(NestedDataTestUtils.class.getClassLoader(), SIMPLE_DATA_FILE)); private InputFormat inputFormat = TestIndex.DEFAULT_JSON_INPUT_FORMAT; - private TimestampSpec timestampSpec = TIMESTAMP_SPEC; + private TimestampSpec timestampSpec = TimestampSpec.DEFAULT; private DimensionsSpec dimensionsSpec = AUTO_DISCOVERY; private TransformSpec transformSpec = TransformSpec.NONE; private AggregatorFactory[] aggregators = COUNT; diff --git a/processing/src/test/java/org/apache/druid/query/metadata/SegmentAnalysisTest.java b/processing/src/test/java/org/apache/druid/query/metadata/SegmentAnalysisTest.java index a718314e02d3..1477a16a3261 100644 --- a/processing/src/test/java/org/apache/druid/query/metadata/SegmentAnalysisTest.java +++ b/processing/src/test/java/org/apache/druid/query/metadata/SegmentAnalysisTest.java @@ -94,7 +94,7 @@ public void testSerde() throws Exception .build(), 16 )), - new TimestampSpec(null, null, null), + TimestampSpec.DEFAULT, Granularities.SECOND, true ); diff --git a/processing/src/test/java/org/apache/druid/segment/UnnestCursorFactoryTest.java b/processing/src/test/java/org/apache/druid/segment/UnnestCursorFactoryTest.java index 9c98be20ee35..6a4cadcc79e8 100644 --- a/processing/src/test/java/org/apache/druid/segment/UnnestCursorFactoryTest.java +++ b/processing/src/test/java/org/apache/druid/segment/UnnestCursorFactoryTest.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableList; import org.apache.druid.data.input.InputSource; import org.apache.druid.data.input.ResourceInputSource; +import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.io.Closer; @@ -135,7 +136,7 @@ public static void setup() throws IOException .tmpDir(tmp.newFolder()) .schema( IncrementalIndexSchema.builder() - .withTimestampSpec(NestedDataTestUtils.TIMESTAMP_SPEC) + .withTimestampSpec(TimestampSpec.DEFAULT) .withDimensionsSpec(NestedDataTestUtils.AUTO_DISCOVERY) .withQueryGranularity(Granularities.DAY) .withRollup(false) diff --git a/processing/src/test/java/org/apache/druid/segment/generator/SegmentGenerator.java b/processing/src/test/java/org/apache/druid/segment/generator/SegmentGenerator.java index b8b4221eb6be..97a9d25afd26 100644 --- a/processing/src/test/java/org/apache/druid/segment/generator/SegmentGenerator.java +++ b/processing/src/test/java/org/apache/druid/segment/generator/SegmentGenerator.java @@ -252,7 +252,7 @@ public QueryableIndex generate( final Transformer transformer = transformSpec.toTransformer(); final InputRowSchema rowSchema = new InputRowSchema( - new TimestampSpec(null, null, null), + TimestampSpec.DEFAULT, dimensionsSpec, null ); @@ -370,7 +370,7 @@ public IncrementalIndex generateIncrementalIndex( final Transformer transformer = transformSpec.toTransformer(); final InputRowSchema rowSchema = new InputRowSchema( - new TimestampSpec(null, null, null), + TimestampSpec.DEFAULT, dimensionsSpec, null ); diff --git a/processing/src/test/java/org/apache/druid/segment/nested/NestedFieldColumnSelectorsTest.java b/processing/src/test/java/org/apache/druid/segment/nested/NestedFieldColumnSelectorsTest.java index 8e694cbcd930..6e2e32a86689 100644 --- a/processing/src/test/java/org/apache/druid/segment/nested/NestedFieldColumnSelectorsTest.java +++ b/processing/src/test/java/org/apache/druid/segment/nested/NestedFieldColumnSelectorsTest.java @@ -20,6 +20,7 @@ package org.apache.druid.segment.nested; import com.fasterxml.jackson.databind.Module; +import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.error.DruidException; import org.apache.druid.guice.BuiltInTypesModule; import org.apache.druid.java.util.common.granularity.Granularities; @@ -334,7 +335,7 @@ private ColumnSelectorFactory getNumericColumnSelectorFactory(VirtualColumns vir closer, NestedDataTestUtils.NUMERIC_DATA_FILE, TestIndex.DEFAULT_JSON_INPUT_FORMAT, - NestedDataTestUtils.TIMESTAMP_SPEC, + TimestampSpec.DEFAULT, NestedDataTestUtils.AUTO_DISCOVERY, TransformSpec.NONE, NestedDataTestUtils.COUNT, @@ -361,7 +362,7 @@ private VectorColumnSelectorFactory getVectorColumnSelectorFactory(VirtualColumn closer, NestedDataTestUtils.NUMERIC_DATA_FILE, TestIndex.DEFAULT_JSON_INPUT_FORMAT, - NestedDataTestUtils.TIMESTAMP_SPEC, + TimestampSpec.DEFAULT, NestedDataTestUtils.AUTO_DISCOVERY, TransformSpec.NONE, NestedDataTestUtils.COUNT, diff --git a/quidem-ut/src/main/java/org/apache/druid/quidem/KttmNestedComponentSupplier.java b/quidem-ut/src/main/java/org/apache/druid/quidem/KttmNestedComponentSupplier.java index caec860ea0f1..7c81754e6a18 100644 --- a/quidem-ut/src/main/java/org/apache/druid/quidem/KttmNestedComponentSupplier.java +++ b/quidem-ut/src/main/java/org/apache/druid/quidem/KttmNestedComponentSupplier.java @@ -130,7 +130,7 @@ public IncrementalIndex makeKttmNestedIndex() .schema( new IncrementalIndexSchema.Builder() .withRollup(false) - .withTimestampSpec(new TimestampSpec("timestamp", null, null)) + .withTimestampSpec(TimestampSpec.DEFAULT) .withDimensionsSpec(new DimensionsSpec(dimensions)) .build() ) diff --git a/server/src/main/java/org/apache/druid/segment/indexing/CombinedDataSchema.java b/server/src/main/java/org/apache/druid/segment/indexing/CombinedDataSchema.java index 4e1fba3b363d..3e325b857fe4 100644 --- a/server/src/main/java/org/apache/druid/segment/indexing/CombinedDataSchema.java +++ b/server/src/main/java/org/apache/druid/segment/indexing/CombinedDataSchema.java @@ -56,7 +56,6 @@ public CombinedDataSchema( granularitySpec, transformSpec, projections, - null, null ); this.multiValuedDimensions = multiValuedDimensions; diff --git a/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java b/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java index a3df6001f056..dbe0836c4dd8 100644 --- a/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java +++ b/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java @@ -19,13 +19,9 @@ package org.apache.druid.segment.indexing; -import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonInclude; -import com.fasterxml.jackson.annotation.JsonInclude.Include; import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.Multiset; import com.google.common.collect.Sets; @@ -34,8 +30,6 @@ import org.apache.druid.data.input.impl.AggregateProjectionSpec; import org.apache.druid.data.input.impl.DimensionSchema; import org.apache.druid.data.input.impl.DimensionsSpec; -import org.apache.druid.data.input.impl.InputRowParser; -import org.apache.druid.data.input.impl.ParseSpec; import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.error.DruidException; import org.apache.druid.error.InvalidInput; @@ -57,6 +51,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.TreeMap; import java.util.stream.Collectors; @@ -83,43 +78,36 @@ public static Builder builder(DataSchema schema) private final AggregatorFactory[] aggregators; private final GranularitySpec granularitySpec; private final TransformSpec transformSpec; - private final Map parserMap; - private final ObjectMapper objectMapper; - - // The below fields can be initialized lazily from parser for backward compatibility. - private TimestampSpec timestampSpec; - private DimensionsSpec dimensionsSpec; - - // This is used for backward compatibility - private InputRowParser inputRowParser; @Nullable - private List projections; + private final TimestampSpec timestampSpec; + @Nullable + private final DimensionsSpec dimensionsSpec; + @Nullable + private final List projections; @JsonCreator public DataSchema( @JsonProperty("dataSource") String dataSource, - @JsonProperty("timestampSpec") @Nullable TimestampSpec timestampSpec, // can be null in old task spec - @JsonProperty("dimensionsSpec") @Nullable DimensionsSpec dimensionsSpec, // can be null in old task spec - @JsonProperty("metricsSpec") AggregatorFactory[] aggregators, - @JsonProperty("granularitySpec") GranularitySpec granularitySpec, + @JsonProperty("timestampSpec") @Nullable TimestampSpec timestampSpec, + @JsonProperty("dimensionsSpec") @Nullable DimensionsSpec dimensionsSpec, + @JsonProperty("metricsSpec") @Nullable AggregatorFactory[] aggregators, + @JsonProperty("granularitySpec") @Nullable GranularitySpec granularitySpec, @JsonProperty("transformSpec") TransformSpec transformSpec, @JsonProperty("projections") @Nullable List projections, - @Deprecated @JsonProperty("parser") @Nullable Map parserMap, - @JacksonInject ObjectMapper objectMapper + @Deprecated @JsonProperty("parser") @Nullable Map parserMap ) { + InvalidInput.conditionalException(parserMap == null, "parser was removed in Druid 37, define the timestampSpec and dimensionsSpec on the schema directly instead of nested inside the parser definition"); validateDatasourceName(dataSource); this.dataSource = dataSource; - this.timestampSpec = timestampSpec; + this.timestampSpec = InvalidInput.notNull(timestampSpec, "timestampSpec"); this.aggregators = aggregators == null ? new AggregatorFactory[]{} : aggregators; - this.dimensionsSpec = dimensionsSpec == null - ? null - : computeDimensionsSpec( - Preconditions.checkNotNull(timestampSpec, "timestampSpec"), - dimensionsSpec, - this.aggregators - ); + this.dimensionsSpec = dimensionsSpec == null ? null : computeDimensionsSpec( + timestampSpec, + dimensionsSpec, + this.aggregators + ); if (granularitySpec == null) { log.warn("No granularitySpec has been specified. Using UniformGranularitySpec as default."); @@ -129,8 +117,6 @@ public DataSchema( } this.transformSpec = transformSpec == null ? TransformSpec.NONE : transformSpec; this.projections = projections; - this.parserMap = parserMap; - this.objectMapper = objectMapper; // Fail-fast if there are output name collisions. Note: because of the pull-from-parser magic in getDimensionsSpec, // this validation is not necessarily going to be able to catch everything. It will run again in getDimensionsSpec. @@ -147,7 +133,6 @@ public DataSchema( dataSource ); } - } @JsonProperty @@ -158,35 +143,15 @@ public String getDataSource() @Nullable @JsonProperty("timestampSpec") - private TimestampSpec getGivenTimestampSpec() - { - return timestampSpec; - } - public TimestampSpec getTimestampSpec() { - if (timestampSpec == null) { - timestampSpec = Preconditions.checkNotNull(getParser(), "inputRowParser").getParseSpec().getTimestampSpec(); - } return timestampSpec; } @Nullable @JsonProperty("dimensionsSpec") - private DimensionsSpec getGivenDimensionsSpec() - { - return dimensionsSpec; - } - public DimensionsSpec getDimensionsSpec() { - if (dimensionsSpec == null) { - dimensionsSpec = computeDimensionsSpec( - getTimestampSpec(), - Preconditions.checkNotNull(getParser(), "inputRowParser").getParseSpec().getDimensionsSpec(), - aggregators - ); - } return dimensionsSpec; } @@ -225,39 +190,6 @@ public List getProjectionNames() return projections.stream().map(AggregateProjectionSpec::getName).collect(Collectors.toList()); } - @Deprecated - @JsonProperty("parser") - @Nullable - @JsonInclude(Include.NON_NULL) - public Map getParserMap() - { - return parserMap; - } - - @Nullable - public InputRowParser getParser() - { - if (inputRowParser == null) { - if (parserMap == null) { - return null; - } - //noinspection unchecked - inputRowParser = transformSpec.decorate(objectMapper.convertValue(this.parserMap, InputRowParser.class)); - ParseSpec parseSpec = inputRowParser.getParseSpec(); - parseSpec = parseSpec.withDimensionsSpec( - computeDimensionsSpec(parseSpec.getTimestampSpec(), parseSpec.getDimensionsSpec(), aggregators) - ); - if (timestampSpec != null) { - parseSpec = parseSpec.withTimestampSpec(timestampSpec); - } - if (dimensionsSpec != null) { - parseSpec = parseSpec.withDimensionsSpec(dimensionsSpec); - } - inputRowParser = inputRowParser.withParseSpec(parseSpec); - } - return inputRowParser; - } - public DataSchema withGranularitySpec(GranularitySpec granularitySpec) { return builder(this).withGranularity(granularitySpec).build(); @@ -281,11 +213,9 @@ public String toString() ", aggregators=" + Arrays.toString(aggregators) + ", granularitySpec=" + granularitySpec + ", transformSpec=" + transformSpec + - ", parserMap=" + parserMap + ", timestampSpec=" + timestampSpec + ", dimensionsSpec=" + dimensionsSpec + ", projections=" + projections + - ", inputRowParser=" + inputRowParser + '}'; } @@ -527,14 +457,42 @@ private static Set getFieldsOrThrowIfErrors(Map } } + @Override + public boolean equals(Object o) + { + if (o == null || getClass() != o.getClass()) { + return false; + } + DataSchema that = (DataSchema) o; + return Objects.equals(dataSource, that.dataSource) && + Objects.deepEquals(aggregators, that.aggregators) && + Objects.equals(granularitySpec, that.granularitySpec) && + Objects.equals(transformSpec, that.transformSpec) && + Objects.equals(timestampSpec, that.timestampSpec) && + Objects.equals(dimensionsSpec, that.dimensionsSpec) + && Objects.equals(projections, that.projections); + } + + @Override + public int hashCode() + { + return Objects.hash( + dataSource, + Arrays.hashCode(aggregators), + granularitySpec, + transformSpec, + timestampSpec, + dimensionsSpec, + projections + ); + } + public static class Builder { private String dataSource; private AggregatorFactory[] aggregators; private GranularitySpec granularitySpec; private TransformSpec transformSpec; - private Map parserMap; - private ObjectMapper objectMapper; private TimestampSpec timestampSpec; private DimensionsSpec dimensionsSpec; private List projections; @@ -553,8 +511,6 @@ public Builder(DataSchema schema) this.aggregators = schema.aggregators; this.projections = schema.projections; this.granularitySpec = schema.granularitySpec; - this.parserMap = schema.parserMap; - this.objectMapper = schema.objectMapper; } public Builder withDataSource(String dataSource) @@ -610,20 +566,6 @@ public Builder withProjections(List projections) return this; } - @Deprecated - public Builder withObjectMapper(ObjectMapper objectMapper) - { - this.objectMapper = objectMapper; - return this; - } - - @Deprecated - public Builder withParserMap(Map parserMap) - { - this.parserMap = parserMap; - return this; - } - public DataSchema build() { return new DataSchema( @@ -634,8 +576,7 @@ public DataSchema build() granularitySpec, transformSpec, projections, - parserMap, - objectMapper + null ); } } diff --git a/server/src/test/java/org/apache/druid/metadata/input/SqlInputSourceTest.java b/server/src/test/java/org/apache/druid/metadata/input/SqlInputSourceTest.java index c48dfd839463..c70aa08f99ca 100644 --- a/server/src/test/java/org/apache/druid/metadata/input/SqlInputSourceTest.java +++ b/server/src/test/java/org/apache/druid/metadata/input/SqlInputSourceTest.java @@ -71,7 +71,7 @@ public class SqlInputSourceTest private final String TABLE_2 = "FOOS_TABLE_2"; private static final InputRowSchema INPUT_ROW_SCHEMA = new InputRowSchema( - new TimestampSpec("timestamp", "auto", null), + TimestampSpec.DEFAULT, new DimensionsSpec( DimensionsSpec.getDefaultSchemas(Arrays.asList("timestamp", "a", "b")) ), diff --git a/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java b/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java index 9e83ab71ccf8..1aa8a296ddcf 100644 --- a/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java +++ b/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java @@ -19,61 +19,50 @@ package org.apache.druid.segment.indexing; -import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.exc.ValueInstantiationException; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import nl.jqno.equalsverifier.EqualsVerifier; import org.apache.commons.text.StringEscapeUtils; import org.apache.druid.common.utils.IdUtilsTest; -import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.impl.AggregateProjectionSpec; import org.apache.druid.data.input.impl.DimensionsSpec; -import org.apache.druid.data.input.impl.JSONParseSpec; import org.apache.druid.data.input.impl.LongDimensionSchema; import org.apache.druid.data.input.impl.StringDimensionSchema; -import org.apache.druid.data.input.impl.StringInputRowParser; import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.error.DruidException; import org.apache.druid.error.DruidExceptionMatcher; import org.apache.druid.indexer.granularity.ArbitraryGranularitySpec; import org.apache.druid.indexer.granularity.GranularitySpec; import org.apache.druid.indexer.granularity.UniformGranularitySpec; -import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.DurationGranularity; import org.apache.druid.java.util.common.granularity.Granularities; -import org.apache.druid.java.util.common.jackson.JacksonUtils; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; -import org.apache.druid.query.expression.TestExprMacroTable; -import org.apache.druid.query.filter.SelectorDimFilter; import org.apache.druid.segment.TestHelper; -import org.apache.druid.segment.transform.ExpressionTransform; import org.apache.druid.segment.transform.TransformSpec; import org.apache.druid.testing.InitializedNullHandlingTest; import org.hamcrest.MatcherAssert; -import org.hamcrest.Matchers; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.mockito.ArgumentMatchers; import org.mockito.Mockito; import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; import java.util.Arrays; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; class DataSchemaTest extends InitializedNullHandlingTest { + public static final TimestampSpec TIMESTAMP_SPEC = new TimestampSpec("time", "auto", null); private static ArbitraryGranularitySpec ARBITRARY_GRANULARITY = new ArbitraryGranularitySpec( Granularities.DAY, ImmutableList.of(Intervals.of("2014/2015")) @@ -84,28 +73,17 @@ class DataSchemaTest extends InitializedNullHandlingTest @Test void testDefaultExclusions() { - Map parser = jsonMapper.convertValue( - new StringInputRowParser( - new JSONParseSpec( - new TimestampSpec("time", "auto", null), - new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dimB", "dimA"))), - null, - null, - null - ), - null - ), JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT - ); - DataSchema schema = DataSchema.builder() .withDataSource(IdUtilsTest.VALID_ID_CHARS) - .withParserMap(parser) + .withTimestamp(TIMESTAMP_SPEC) + .withDimensions( + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(List.of("dimB", "dimA"))) + ) .withAggregators( new DoubleSumAggregatorFactory("metric1", "col1"), new DoubleSumAggregatorFactory("metric2", "col2") ) .withGranularity(ARBITRARY_GRANULARITY) - .withObjectMapper(jsonMapper) .build(); Assertions.assertEquals( @@ -117,143 +95,61 @@ void testDefaultExclusions() @Test void testExplicitInclude() { - Map parser = jsonMapper.convertValue( - new StringInputRowParser( - new JSONParseSpec( - new TimestampSpec("time", "auto", null), - DimensionsSpec.builder() - .setDimensions( - DimensionsSpec.getDefaultSchemas(ImmutableList.of("time", "dimA", "dimB", "col2")) - ) - .setDimensionExclusions(ImmutableList.of("dimC")) - .build(), - null, - null, - null - ), - null - ), JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT - ); DataSchema schema = DataSchema.builder() .withDataSource(IdUtilsTest.VALID_ID_CHARS) - .withParserMap(parser) + .withTimestamp(TIMESTAMP_SPEC) + .withDimensions( + DimensionsSpec.builder() + .setDimensions( + DimensionsSpec.getDefaultSchemas( + List.of("time", "dimA", "dimB", "col2") + ) + ) + .setDimensionExclusions(ImmutableList.of("dimC")) + .build() + ) .withAggregators( new DoubleSumAggregatorFactory("metric1", "col1"), new DoubleSumAggregatorFactory("metric2", "col2") ) .withGranularity(ARBITRARY_GRANULARITY) - .withObjectMapper(jsonMapper) .build(); Assertions.assertEquals( ImmutableSet.of("__time", "dimC", "col1", "metric1", "metric2"), - schema.getParser().getParseSpec().getDimensionsSpec().getDimensionExclusions() - ); - } - - @Test - void testTransformSpec() - { - Map parserMap = jsonMapper.convertValue( - new StringInputRowParser( - new JSONParseSpec( - new TimestampSpec("time", "auto", null), - new DimensionsSpec( - DimensionsSpec.getDefaultSchemas(ImmutableList.of("time", "dimA", "dimB", "col2")) - ), - null, - null, - null - ), - null - ), JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT + schema.getDimensionsSpec().getDimensionExclusions() ); - - DataSchema schema = DataSchema.builder() - .withDataSource(IdUtilsTest.VALID_ID_CHARS) - .withParserMap(parserMap) - .withAggregators( - new DoubleSumAggregatorFactory("metric1", "col1"), - new DoubleSumAggregatorFactory("metric2", "col2") - ) - .withGranularity(ARBITRARY_GRANULARITY) - .withTransform( - new TransformSpec( - new SelectorDimFilter("dimA", "foo", null), - ImmutableList.of( - new ExpressionTransform( - "expr", - "concat(dimA,dimA)", - TestExprMacroTable.INSTANCE - ) - ) - ) - ) - .withObjectMapper(jsonMapper) - .build(); - - // Test hack that produces a StringInputRowParser. - final StringInputRowParser parser = (StringInputRowParser) schema.getParser(); - - final InputRow row1bb = parser.parseBatch( - ByteBuffer.wrap("{\"time\":\"2000-01-01\",\"dimA\":\"foo\"}".getBytes(StandardCharsets.UTF_8)) - ).get(0); - Assertions.assertEquals(DateTimes.of("2000-01-01"), row1bb.getTimestamp()); - Assertions.assertEquals("foo", row1bb.getRaw("dimA")); - Assertions.assertEquals("foofoo", row1bb.getRaw("expr")); - - final InputRow row1string = parser.parse("{\"time\":\"2000-01-01\",\"dimA\":\"foo\"}"); - Assertions.assertEquals(DateTimes.of("2000-01-01"), row1string.getTimestamp()); - Assertions.assertEquals("foo", row1string.getRaw("dimA")); - Assertions.assertEquals("foofoo", row1string.getRaw("expr")); - - final InputRow row2 = parser.parseBatch( - ByteBuffer.wrap("{\"time\":\"2000-01-01\",\"dimA\":\"x\"}".getBytes(StandardCharsets.UTF_8)) - ).get(0); - Assertions.assertNull(row2); } @Test void testOverlapMetricNameAndDim() { - Map parser = jsonMapper.convertValue( - new StringInputRowParser( - new JSONParseSpec( - new TimestampSpec("time", "auto", null), - DimensionsSpec.builder() - .setDimensions( - DimensionsSpec.getDefaultSchemas( - ImmutableList.of( - "time", - "dimA", - "dimB", - "metric1" - ) - ) - ) - .setDimensionExclusions(ImmutableList.of("dimC")) - .build(), - null, - null, - null - ), - null - ), JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT - ); - - DataSchema schema = DataSchema.builder() - .withDataSource(IdUtilsTest.VALID_ID_CHARS) - .withParserMap(parser) - .withAggregators( - new DoubleSumAggregatorFactory("metric1", "col1"), - new DoubleSumAggregatorFactory("metric2", "col2") - ) - .withGranularity(ARBITRARY_GRANULARITY) - .withObjectMapper(jsonMapper) - .build(); Throwable t = Assertions.assertThrows( DruidException.class, - () -> schema.getParser() + () -> DataSchema.builder() + .withDataSource(IdUtilsTest.VALID_ID_CHARS) + .withTimestamp(TIMESTAMP_SPEC) + .withDimensions( + DimensionsSpec.builder() + .setDimensions( + DimensionsSpec.getDefaultSchemas( + List.of( + "time", + "dimA", + "dimB", + "metric1" + ) + ) + ) + .setDimensionExclusions(List.of("dimC")) + .build() + ) + .withAggregators( + new DoubleSumAggregatorFactory("metric1", "col1"), + new DoubleSumAggregatorFactory("metric2", "col2") + ) + .withGranularity(ARBITRARY_GRANULARITY) + .build() ); Assertions.assertEquals( @@ -267,7 +163,7 @@ void testOverlapTimeAndDimPositionZero() { DataSchema schema = DataSchema.builder() .withDataSource(IdUtilsTest.VALID_ID_CHARS) - .withTimestamp(new TimestampSpec("time", "auto", null)) + .withTimestamp(TIMESTAMP_SPEC) .withDimensions( DimensionsSpec.builder() .setDimensions( @@ -281,7 +177,6 @@ void testOverlapTimeAndDimPositionZero() .build() ) .withGranularity(ARBITRARY_GRANULARITY) - .withObjectMapper(jsonMapper) .build(); Assertions.assertEquals( @@ -299,7 +194,7 @@ void testOverlapTimeAndDimPositionZeroWrongType() DruidException.class, () -> DataSchema.builder() .withDataSource(IdUtilsTest.VALID_ID_CHARS) - .withTimestamp(new TimestampSpec("time", "auto", null)) + .withTimestamp(TIMESTAMP_SPEC) .withDimensions( DimensionsSpec.builder() .setDimensions( @@ -313,7 +208,6 @@ void testOverlapTimeAndDimPositionZeroWrongType() .build() ) .withGranularity(ARBITRARY_GRANULARITY) - .withObjectMapper(jsonMapper) .build() ); @@ -331,7 +225,7 @@ void testOverlapTimeAndDimPositionOne() DruidException.class, () -> DataSchema.builder() .withDataSource(IdUtilsTest.VALID_ID_CHARS) - .withTimestamp(new TimestampSpec("time", "auto", null)) + .withTimestamp(TIMESTAMP_SPEC) .withDimensions( DimensionsSpec.builder() .setDimensions( @@ -345,7 +239,6 @@ void testOverlapTimeAndDimPositionOne() .build() ) .withGranularity(ARBITRARY_GRANULARITY) - .withObjectMapper(jsonMapper) .build() ); @@ -363,7 +256,7 @@ void testOverlapTimeAndDimPositionOne_withExplicitSortOrder() DataSchema schema = DataSchema.builder() .withDataSource(IdUtilsTest.VALID_ID_CHARS) - .withTimestamp(new TimestampSpec("time", "auto", null)) + .withTimestamp(TIMESTAMP_SPEC) .withDimensions( DimensionsSpec.builder() .setDimensions( @@ -378,7 +271,6 @@ void testOverlapTimeAndDimPositionOne_withExplicitSortOrder() .build() ) .withGranularity(ARBITRARY_GRANULARITY) - .withObjectMapper(jsonMapper) .build(); Assertions.assertEquals( @@ -389,76 +281,20 @@ void testOverlapTimeAndDimPositionOne_withExplicitSortOrder() Assertions.assertFalse(schema.getDimensionsSpec().isForceSegmentSortByTime()); } - @Test - void testOverlapTimeAndDimLegacy() - { - Map parser = jsonMapper.convertValue( - new StringInputRowParser( - new JSONParseSpec( - new TimestampSpec("time", "auto", null), - DimensionsSpec.builder() - .setDimensions( - DimensionsSpec.getDefaultSchemas( - ImmutableList.of( - "__time", - "dimA", - "dimB", - "metric1" - ) - ) - ) - .setDimensionExclusions(ImmutableList.of("dimC")) - .build(), - null, - null, - null - ), - null - ), JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT - ); - - DataSchema schema = DataSchema.builder() - .withDataSource(IdUtilsTest.VALID_ID_CHARS) - .withParserMap(parser) - .withGranularity(ARBITRARY_GRANULARITY) - .withObjectMapper(jsonMapper) - .build(); - - Throwable t = Assertions.assertThrows( - DruidException.class, - () -> schema.getParser() - ); - - Assertions.assertEquals( - "Encountered dimension[__time] with incorrect type[STRING]. Type must be 'long'.", - t.getMessage() - ); - } - @Test void testDuplicateAggregators() { - Map parser = jsonMapper.convertValue( - new StringInputRowParser( - new JSONParseSpec( - new TimestampSpec("time", "auto", null), - DimensionsSpec.builder() - .setDimensions(DimensionsSpec.getDefaultSchemas(ImmutableList.of("time"))) - .setDimensionExclusions(ImmutableList.of("dimC")) - .build(), - null, - null, - null - ), - null - ), JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT - ); - Throwable t = Assertions.assertThrows( DruidException.class, () -> DataSchema.builder() .withDataSource(IdUtilsTest.VALID_ID_CHARS) - .withParserMap(parser) + .withTimestamp(TIMESTAMP_SPEC) + .withDimensions( + DimensionsSpec.builder() + .setDimensions(DimensionsSpec.getDefaultSchemas(ImmutableList.of("time"))) + .setDimensionExclusions(ImmutableList.of("dimC")) + .build() + ) .withAggregators( new DoubleSumAggregatorFactory("metric1", "col1"), new DoubleSumAggregatorFactory("metric2", "col2"), @@ -467,7 +303,6 @@ void testDuplicateAggregators() new DoubleSumAggregatorFactory("metric3", "col5") ) .withGranularity(ARBITRARY_GRANULARITY) - .withObjectMapper(jsonMapper) .build() ); @@ -478,72 +313,26 @@ void testDuplicateAggregators() ); } - @Test - void testSerdeWithInvalidParserMap() throws Exception - { - String jsonStr = "{" - + "\"dataSource\":\"" + StringEscapeUtils.escapeJson(IdUtilsTest.VALID_ID_CHARS) + "\"," - + "\"parser\":{\"type\":\"invalid\"}," - + "\"metricsSpec\":[{\"type\":\"doubleSum\",\"name\":\"metric1\",\"fieldName\":\"col1\"}]," - + "\"granularitySpec\":{" - + "\"type\":\"arbitrary\"," - + "\"queryGranularity\":{\"type\":\"duration\",\"duration\":86400000,\"origin\":\"1970-01-01T00:00:00.000Z\"}," - + "\"intervals\":[\"2014-01-01T00:00:00.000Z/2015-01-01T00:00:00.000Z\"]}}"; - - - //no error on serde as parser is converted to InputRowParser lazily when really needed - DataSchema schema = jsonMapper.readValue( - jsonMapper.writeValueAsString( - jsonMapper.readValue(jsonStr, DataSchema.class) - ), - DataSchema.class - ); - - Throwable t = Assertions.assertThrows( - IllegalArgumentException.class, - () -> schema.getParser() - ); - MatcherAssert.assertThat( - t.getMessage(), - Matchers.startsWith("Cannot construct instance of `org.apache.druid.data.input.impl.StringInputRowParser`, problem: parseSpec") - ); - MatcherAssert.assertThat( - t.getCause(), - Matchers.instanceOf(JsonMappingException.class) - ); - } - @Test void testEmptyDatasource() { - Map parser = jsonMapper.convertValue( - new StringInputRowParser( - new JSONParseSpec( - new TimestampSpec("time", "auto", null), - DimensionsSpec.builder() - .setDimensions( - DimensionsSpec.getDefaultSchemas(ImmutableList.of("time", "dimA", "dimB", "col2")) - ) - .setDimensionExclusions(ImmutableList.of("dimC")) - .build(), - null, - null, - null - ), - null - ), JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT - ); - DruidExceptionMatcher.ThrowingSupplier thrower = () -> DataSchema.builder() .withDataSource("") - .withParserMap(parser) + .withTimestamp(TIMESTAMP_SPEC) + .withDimensions( + DimensionsSpec.builder() + .setDimensions( + DimensionsSpec.getDefaultSchemas(List.of("time", "dimA", "dimB", "col2")) + ) + .setDimensionExclusions(List.of("dimC")) + .build() + ) .withAggregators( new DoubleSumAggregatorFactory("metric1", "col1"), new DoubleSumAggregatorFactory("metric2", "col2") ) .withGranularity(ARBITRARY_GRANULARITY) - .withObjectMapper(jsonMapper) .build(); DruidExceptionMatcher.invalidInput() .expectMessageIs("Invalid value for field [dataSource]: must not be null") @@ -569,8 +358,6 @@ void testInvalidWhitespaceDatasource() DruidExceptionMatcher.invalidInput().expectMessageIs(msg).assertThrowsAndMatches( () -> DataSchema.builder() .withDataSource(dataSource) - .withParserMap(Collections.emptyMap()) - .withObjectMapper(jsonMapper) .build() ); } @@ -582,16 +369,9 @@ void testSerde() throws Exception // deserialize, then serialize, then deserialize of DataSchema. String jsonStr = "{" + "\"dataSource\":\"" + StringEscapeUtils.escapeJson(IdUtilsTest.VALID_ID_CHARS) + "\"," - + "\"parser\":{" - + "\"type\":\"string\"," - + "\"parseSpec\":{" - + "\"format\":\"json\"," + "\"timestampSpec\":{\"column\":\"xXx\", \"format\": \"auto\", \"missingValue\": null}," + "\"dimensionsSpec\":{\"dimensions\":[], \"dimensionExclusions\":[]}," + "\"flattenSpec\":{\"useFieldDiscovery\":true, \"fields\":[]}," - + "\"featureSpec\":{}}," - + "\"encoding\":\"UTF-8\"" - + "}," + "\"metricsSpec\":[{\"type\":\"doubleSum\",\"name\":\"metric1\",\"fieldName\":\"col1\"}]," + "\"granularitySpec\":{" + "\"type\":\"arbitrary\"," @@ -607,14 +387,12 @@ void testSerde() throws Exception Assertions.assertEquals(IdUtilsTest.VALID_ID_CHARS, actual.getDataSource()); Assertions.assertEquals( - new JSONParseSpec( - new TimestampSpec("xXx", null, null), - DimensionsSpec.builder().setDimensionExclusions(Arrays.asList("__time", "metric1", "xXx", "col1")).build(), - null, - null, - null - ), - actual.getParser().getParseSpec() + new TimestampSpec("xXx", null, null), + actual.getTimestampSpec() + ); + Assertions.assertEquals( + DimensionsSpec.builder().setDimensionExclusions(Arrays.asList("__time", "metric1", "xXx", "col1")).build(), + actual.getDimensionsSpec() ); Assertions.assertArrayEquals( new AggregatorFactory[]{ @@ -632,6 +410,35 @@ void testSerde() throws Exception Assertions.assertNull(actual.getProjections()); } + @Test + void testSerdeFailsWithParser() throws Exception + { + // deserialize, then serialize, then deserialize of DataSchema. + String jsonStr = "{" + + "\"dataSource\":\"" + StringEscapeUtils.escapeJson(IdUtilsTest.VALID_ID_CHARS) + "\"," + + "\"parser\":{" + + "\"type\":\"string\"," + + "\"parseSpec\":{" + + "\"format\":\"json\"," + + "\"timestampSpec\":{\"column\":\"xXx\", \"format\": \"auto\", \"missingValue\": null}," + + "\"dimensionsSpec\":{\"dimensions\":[], \"dimensionExclusions\":[]}," + + "\"flattenSpec\":{\"useFieldDiscovery\":true, \"fields\":[]}," + + "\"featureSpec\":{}}," + + "\"encoding\":\"UTF-8\"" + + "}," + + "\"metricsSpec\":[{\"type\":\"doubleSum\",\"name\":\"metric1\",\"fieldName\":\"col1\"}]," + + "\"granularitySpec\":{" + + "\"type\":\"arbitrary\"," + + "\"queryGranularity\":{\"type\":\"duration\",\"duration\":86400000,\"origin\":\"1970-01-01T00:00:00.000Z\"}," + + "\"intervals\":[\"2014-01-01T00:00:00.000Z/2015-01-01T00:00:00.000Z\"]}}"; + + Throwable t = Assertions.assertThrows( + ValueInstantiationException.class, + () -> jsonMapper.readValue(jsonStr, DataSchema.class) + ); + Assertions.assertTrue(t.getMessage().contains("parser was removed in Druid 37")); + } + @Test public void testSerdeWithProjections() throws Exception { @@ -648,7 +455,7 @@ public void testSerdeWithProjections() throws Exception .build(); DataSchema original = DataSchema.builder() .withDataSource("datasource") - .withTimestamp(new TimestampSpec(null, null, null)) + .withTimestamp(TIMESTAMP_SPEC) .withDimensions(DimensionsSpec.EMPTY) .withAggregators(new CountAggregatorFactory("rows")) .withProjections(ImmutableList.of(projectionSpec)) @@ -657,7 +464,10 @@ public void testSerdeWithProjections() throws Exception DataSchema serdeResult = jsonMapper.readValue(jsonMapper.writeValueAsString(original), DataSchema.class); Assertions.assertEquals("datasource", serdeResult.getDataSource()); - Assertions.assertArrayEquals(new AggregatorFactory[]{new CountAggregatorFactory("rows")}, serdeResult.getAggregators()); + Assertions.assertArrayEquals( + new AggregatorFactory[]{new CountAggregatorFactory("rows")}, + serdeResult.getAggregators() + ); Assertions.assertEquals(ImmutableList.of(projectionSpec), serdeResult.getProjections()); Assertions.assertEquals(ImmutableList.of("ab_count_projection"), serdeResult.getProjectionNames()); Assertions.assertEquals(jsonMapper.writeValueAsString(original), jsonMapper.writeValueAsString(serdeResult)); @@ -683,16 +493,9 @@ void testSerializeWithInvalidDataSourceName() throws Exception for (Map.Entry entry : datasourceToErrorMsg.entrySet()) { String jsonStr = "{" + "\"dataSource\":\"" + StringEscapeUtils.escapeJson(entry.getKey()) + "\"," - + "\"parser\":{" - + "\"type\":\"string\"," - + "\"parseSpec\":{" - + "\"format\":\"json\"," + "\"timestampSpec\":{\"column\":\"xXx\", \"format\": \"auto\", \"missingValue\": null}," + "\"dimensionsSpec\":{\"dimensions\":[], \"dimensionExclusions\":[]}," + "\"flattenSpec\":{\"useFieldDiscovery\":true, \"fields\":[]}," - + "\"featureSpec\":{}}," - + "\"encoding\":\"UTF-8\"" - + "}," + "\"metricsSpec\":[{\"type\":\"doubleSum\",\"name\":\"metric1\",\"fieldName\":\"col1\"}]," + "\"granularitySpec\":{" + "\"type\":\"arbitrary\"," @@ -723,70 +526,46 @@ void testSerializeWithInvalidDataSourceName() throws Exception @Test void testSerdeWithUpdatedDataSchemaAddedField() throws IOException { - Map parser = jsonMapper.convertValue( - new StringInputRowParser( - new JSONParseSpec( - new TimestampSpec("time", "auto", null), - new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dimB", "dimA"))), - null, - null, - null - ), - null - ), JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT - ); - DataSchema originalSchema = DataSchema.builder() .withDataSource(IdUtilsTest.VALID_ID_CHARS) - .withParserMap(parser) + .withTimestamp(TIMESTAMP_SPEC) + .withDimensions( + new DimensionsSpec( + DimensionsSpec.getDefaultSchemas(List.of("dimB", "dimA")) + ) + ) .withAggregators( new DoubleSumAggregatorFactory("metric1", "col1"), new DoubleSumAggregatorFactory("metric2", "col2") ) .withGranularity(ARBITRARY_GRANULARITY) - .withObjectMapper(jsonMapper) .build(); String serialized = jsonMapper.writeValueAsString(originalSchema); TestModifiedDataSchema deserialized = jsonMapper.readValue(serialized, TestModifiedDataSchema.class); - Assertions.assertEquals(null, deserialized.getExtra()); + Assertions.assertNull(deserialized.getExtra()); Assertions.assertEquals(originalSchema.getDataSource(), deserialized.getDataSource()); Assertions.assertEquals(originalSchema.getGranularitySpec(), deserialized.getGranularitySpec()); - Assertions.assertEquals(originalSchema.getParser().getParseSpec(), deserialized.getParser().getParseSpec()); + Assertions.assertEquals(originalSchema.getTimestampSpec(), deserialized.getTimestampSpec()); + Assertions.assertEquals(originalSchema.getDimensionsSpec(), deserialized.getDimensionsSpec()); Assertions.assertArrayEquals(originalSchema.getAggregators(), deserialized.getAggregators()); Assertions.assertEquals(originalSchema.getTransformSpec(), deserialized.getTransformSpec()); - Assertions.assertEquals(originalSchema.getParserMap(), deserialized.getParserMap()); } @Test void testSerdeWithUpdatedDataSchemaRemovedField() throws IOException { - Map parser = jsonMapper.convertValue( - new StringInputRowParser( - new JSONParseSpec( - new TimestampSpec("time", "auto", null), - new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dimB", "dimA"))), - null, - null, - null - ), - null - ), JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT - ); - TestModifiedDataSchema originalSchema = new TestModifiedDataSchema( IdUtilsTest.VALID_ID_CHARS, - null, - null, + TIMESTAMP_SPEC, + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dimB", "dimA"))), new AggregatorFactory[]{ new DoubleSumAggregatorFactory("metric1", "col1"), new DoubleSumAggregatorFactory("metric2", "col2") }, ARBITRARY_GRANULARITY, null, - parser, - jsonMapper, "some arbitrary string" ); @@ -795,10 +574,10 @@ void testSerdeWithUpdatedDataSchemaRemovedField() throws IOException Assertions.assertEquals(originalSchema.getDataSource(), deserialized.getDataSource()); Assertions.assertEquals(originalSchema.getGranularitySpec(), deserialized.getGranularitySpec()); - Assertions.assertEquals(originalSchema.getParser().getParseSpec(), deserialized.getParser().getParseSpec()); + Assertions.assertEquals(originalSchema.getTimestampSpec(), deserialized.getTimestampSpec()); + Assertions.assertEquals(originalSchema.getDimensionsSpec(), deserialized.getDimensionsSpec()); Assertions.assertArrayEquals(originalSchema.getAggregators(), deserialized.getAggregators()); Assertions.assertEquals(originalSchema.getTransformSpec(), deserialized.getTransformSpec()); - Assertions.assertEquals(originalSchema.getParserMap(), deserialized.getParserMap()); } @Test @@ -811,7 +590,6 @@ void testWithDimensionSpec() AggregatorFactory aggFactory = Mockito.mock(AggregatorFactory.class); Mockito.when(aggFactory.getName()).thenReturn("myAgg"); TransformSpec transSpec = Mockito.mock(TransformSpec.class); - Map parserMap = Mockito.mock(Map.class); Mockito.when(newDimSpec.withDimensionExclusions(ArgumentMatchers.any(Set.class))).thenReturn(newDimSpec); DataSchema oldSchema = DataSchema.builder() @@ -821,8 +599,6 @@ void testWithDimensionSpec() .withAggregators(aggFactory) .withGranularity(gSpec) .withTransform(transSpec) - .withParserMap(parserMap) - .withObjectMapper(jsonMapper) .build(); DataSchema newSchema = oldSchema.withDimensionsSpec(newDimSpec); Assertions.assertSame(oldSchema.getDataSource(), newSchema.getDataSource()); @@ -831,7 +607,6 @@ void testWithDimensionSpec() Assertions.assertSame(oldSchema.getAggregators(), newSchema.getAggregators()); Assertions.assertSame(oldSchema.getGranularitySpec(), newSchema.getGranularitySpec()); Assertions.assertSame(oldSchema.getTransformSpec(), newSchema.getTransformSpec()); - Assertions.assertSame(oldSchema.getParserMap(), newSchema.getParserMap()); } @@ -842,7 +617,7 @@ void testCombinedDataSchemaSetsMultiValuedColumnsInfo() CombinedDataSchema schema = new CombinedDataSchema( IdUtilsTest.VALID_ID_CHARS, - new TimestampSpec("time", "auto", null), + TIMESTAMP_SPEC, DimensionsSpec.builder() .setDimensions( DimensionsSpec.getDefaultSchemas(ImmutableList.of("dimA", "dimB", "metric1")) @@ -865,6 +640,7 @@ void testInvalidProjectionDupeNames() DruidException.class, () -> DataSchema.builder() .withDataSource("dataSource") + .withTimestamp(TIMESTAMP_SPEC) .withGranularity( new UniformGranularitySpec( Granularities.HOUR, @@ -910,6 +686,7 @@ void testInvalidProjectionGranularity() DruidException.class, () -> DataSchema.builder() .withDataSource("dataSource") + .withTimestamp(TIMESTAMP_SPEC) .withGranularity( new UniformGranularitySpec( Granularities.HOUR, @@ -953,6 +730,7 @@ void testInvalidProjectionDupeGroupingNames() DruidException.class, () -> DataSchema.builder() .withDataSource("dataSource") + .withTimestamp(TIMESTAMP_SPEC) .withGranularity( new UniformGranularitySpec( Granularities.HOUR, @@ -967,7 +745,10 @@ void testInvalidProjectionDupeGroupingNames() .virtualColumns( Granularities.toVirtualColumn(Granularities.HOUR, "g") ) - .groupingColumns(new LongDimensionSchema("g"), new StringDimensionSchema("g")) + .groupingColumns( + new LongDimensionSchema("g"), + new StringDimensionSchema("g") + ) .aggregators(new CountAggregatorFactory("count")) .build() ) @@ -988,6 +769,7 @@ void testInvalidProjectionDupeAggNames() DruidException.class, () -> DataSchema.builder() .withDataSource("dataSource") + .withTimestamp(TIMESTAMP_SPEC) .withGranularity( new UniformGranularitySpec( Granularities.HOUR, @@ -1018,4 +800,10 @@ void testInvalidProjectionDupeAggNames() t.getMessage() ); } + + @Test + void testEqualsAndHashcode() + { + EqualsVerifier.forClass(DataSchema.class).usingGetClass().verify(); + } } diff --git a/server/src/test/java/org/apache/druid/segment/indexing/TestModifiedDataSchema.java b/server/src/test/java/org/apache/druid/segment/indexing/TestModifiedDataSchema.java index bae5140392be..e8fb1eace9bc 100644 --- a/server/src/test/java/org/apache/druid/segment/indexing/TestModifiedDataSchema.java +++ b/server/src/test/java/org/apache/druid/segment/indexing/TestModifiedDataSchema.java @@ -19,19 +19,14 @@ package org.apache.druid.segment.indexing; -import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.indexer.granularity.GranularitySpec; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.segment.transform.TransformSpec; -import javax.annotation.Nullable; -import java.util.Map; - public class TestModifiedDataSchema extends DataSchema { private final String extra; @@ -44,8 +39,6 @@ public TestModifiedDataSchema( @JsonProperty("metricsSpec") AggregatorFactory[] aggregators, @JsonProperty("granularitySpec") GranularitySpec granularitySpec, @JsonProperty("transformSpec") TransformSpec transformSpec, - @JsonProperty("parser") @Nullable Map parserMap, - @JacksonInject ObjectMapper objectMapper, @JsonProperty("extra") String extra ) { @@ -57,8 +50,7 @@ public TestModifiedDataSchema( granularitySpec, transformSpec, null, - parserMap, - objectMapper + null ); this.extra = extra; } diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorTester.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorTester.java index abd58fb085c7..255628b21c6d 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorTester.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorTester.java @@ -21,8 +21,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.druid.data.input.impl.DimensionsSpec; -import org.apache.druid.data.input.impl.JSONParseSpec; -import org.apache.druid.data.input.impl.MapInputRowParser; import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.indexer.granularity.UniformGranularitySpec; import org.apache.druid.jackson.DefaultObjectMapper; @@ -137,21 +135,10 @@ public BatchAppenderatorTester( objectMapper = new DefaultObjectMapper(); objectMapper.registerSubtypes(LinearShardSpec.class); - final Map parserMap = objectMapper.convertValue( - new MapInputRowParser( - new JSONParseSpec( - new TimestampSpec("ts", "auto", null), - DimensionsSpec.EMPTY, - null, - null, - null - ) - ), - Map.class - ); - schema = DataSchema.builder() .withDataSource(DATASOURCE) + .withTimestamp(new TimestampSpec("ts", "auto", null)) + .withDimensions(DimensionsSpec.EMPTY) .withAggregators( new CountAggregatorFactory("count"), new LongSumAggregatorFactory("met", "met") @@ -159,8 +146,6 @@ public BatchAppenderatorTester( .withGranularity( new UniformGranularitySpec(Granularities.MINUTE, Granularities.NONE, null) ) - .withParserMap(parserMap) - .withObjectMapper(objectMapper) .build(); tuningConfig = new TestAppenderatorConfig( diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java index 135b7e966987..5d6591991088 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java @@ -27,8 +27,6 @@ import org.apache.druid.client.cache.CachePopulatorStats; import org.apache.druid.client.cache.MapCache; import org.apache.druid.data.input.impl.DimensionsSpec; -import org.apache.druid.data.input.impl.JSONParseSpec; -import org.apache.druid.data.input.impl.MapInputRowParser; import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.guice.BuiltInTypesModule; import org.apache.druid.indexer.granularity.UniformGranularitySpec; @@ -129,27 +127,15 @@ public StreamAppenderatorTester( .addValue(ObjectMapper.class.getName(), objectMapper) ); - final Map parserMap = objectMapper.convertValue( - new MapInputRowParser( - new JSONParseSpec( - new TimestampSpec("ts", "auto", null), - DimensionsSpec.EMPTY, - null, - null, - null - ) - ), - Map.class - ); schema = DataSchema.builder() .withDataSource(DATASOURCE) - .withParserMap(parserMap) + .withTimestamp(new TimestampSpec("ts", "auto", null)) + .withDimensions(DimensionsSpec.EMPTY) .withAggregators( new CountAggregatorFactory("count"), new LongSumAggregatorFactory("met", "met") ) .withGranularity(new UniformGranularitySpec(Granularities.MINUTE, Granularities.NONE, null)) - .withObjectMapper(objectMapper) .build(); tuningConfig = new TestAppenderatorConfig( TuningConfig.DEFAULT_APPENDABLE_INDEX, diff --git a/server/src/test/java/org/apache/druid/segment/realtime/sink/SinkTest.java b/server/src/test/java/org/apache/druid/segment/realtime/sink/SinkTest.java index 99dd23f345a2..033d4a1dfa60 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/sink/SinkTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/sink/SinkTest.java @@ -77,7 +77,7 @@ public void testSwap() throws Exception final DataSchema schema = DataSchema.builder() .withDataSource("test") - .withTimestamp(new TimestampSpec(null, null, null)) + .withTimestamp(TimestampSpec.DEFAULT) .withDimensions(DimensionsSpec.EMPTY) .withAggregators(new CountAggregatorFactory("rows")) .withGranularity(new UniformGranularitySpec(Granularities.HOUR, Granularities.MINUTE, null)) @@ -254,7 +254,7 @@ public void testGetSinkSignature() final DataSchema schema = DataSchema.builder() .withDataSource("test") - .withTimestamp(new TimestampSpec(null, null, null)) + .withTimestamp(TimestampSpec.DEFAULT) .withDimensions( new StringDimensionSchema("dim1"), new LongDimensionSchema("dimLong") diff --git a/services/src/test/java/org/apache/druid/cli/CliPeonTest.java b/services/src/test/java/org/apache/druid/cli/CliPeonTest.java index de3812a33eea..8851c16a4522 100644 --- a/services/src/test/java/org/apache/druid/cli/CliPeonTest.java +++ b/services/src/test/java/org/apache/druid/cli/CliPeonTest.java @@ -414,7 +414,7 @@ public TestStreamingTask( null, DataSchema.builder() .withDataSource(datasource) - .withTimestamp(new TimestampSpec(null, null, null)) + .withTimestamp(TimestampSpec.DEFAULT) .withDimensions(new DimensionsSpec(Collections.emptyList())) .withGranularity(new ArbitraryGranularitySpec(new AllGranularity(), Collections.emptyList())) .build(), diff --git a/services/src/test/java/org/apache/druid/cli/DumpSegmentTest.java b/services/src/test/java/org/apache/druid/cli/DumpSegmentTest.java index 186d109b1f53..8db258c7f53b 100644 --- a/services/src/test/java/org/apache/druid/cli/DumpSegmentTest.java +++ b/services/src/test/java/org/apache/druid/cli/DumpSegmentTest.java @@ -355,7 +355,7 @@ public static List createSegments( closer, "nested-test-data.json", TestIndex.DEFAULT_JSON_INPUT_FORMAT, - new TimestampSpec("timestamp", null, null), + TimestampSpec.DEFAULT, DimensionsSpec.builder().useSchemaDiscovery(true).build(), null, new AggregatorFactory[] { diff --git a/services/src/test/java/org/apache/druid/cli/validate/DruidJsonValidatorTest.java b/services/src/test/java/org/apache/druid/cli/validate/DruidJsonValidatorTest.java index 52f1bf4f6263..2a7f8c977132 100644 --- a/services/src/test/java/org/apache/druid/cli/validate/DruidJsonValidatorTest.java +++ b/services/src/test/java/org/apache/druid/cli/validate/DruidJsonValidatorTest.java @@ -24,6 +24,7 @@ import com.google.inject.Injector; import org.apache.druid.data.input.impl.JsonInputFormat; import org.apache.druid.data.input.impl.LocalInputSource; +import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.guice.GuiceInjectors; import org.apache.druid.indexer.granularity.UniformGranularitySpec; import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; @@ -134,8 +135,8 @@ public void testTaskValidator() throws Exception new IndexTask.IndexIngestionSpec( DataSchema.builder() .withDataSource("foo") + .withTimestamp(TimestampSpec.DEFAULT) .withGranularity(new UniformGranularitySpec(Granularities.HOUR, Granularities.NONE, null)) - .withObjectMapper(jsonMapper) .build(), new IndexTask.IndexIOConfig( new LocalInputSource(new File("lol"), "rofl"),