Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ private KafkaSupervisorSpec createKafkaSupervisor(String topic)
{
return MoreResources.Supervisor.KAFKA_JSON
.get()
.withDataSchema(schema -> schema.withTimestamp(new TimestampSpec("timestamp", null, null)))
.withDataSchema(schema -> schema.withTimestamp(TimestampSpec.DEFAULT))
.withIoConfig(
ioConfig -> ioConfig
.withConsumerProperties(kafkaServer.consumerProperties())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public SupervisorSpec createSupervisor(String dataSource, String topic, InputFor
{
return MoreResources.Supervisor.KAFKA_JSON
.get()
.withDataSchema(schema -> schema.withTimestamp(new TimestampSpec("timestamp", null, null)))
.withDataSchema(schema -> schema.withTimestamp(TimestampSpec.DEFAULT))
.withIoConfig(
ioConfig -> ioConfig
.withInputFormat(inputFormat)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ protected KinesisSupervisorSpec createKinesisSupervisor(KinesisResource kinesis,
null,
DataSchema.builder()
.withDataSource(dataSource)
.withTimestamp(new TimestampSpec("timestamp", null, null))
.withTimestamp(TimestampSpec.DEFAULT)
.withGranularity(new UniformGranularitySpec(Granularities.HOUR, null, null))
.withDimensions(DimensionsSpec.EMPTY)
.build(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ private KinesisSupervisorSpec createKinesisSupervisorSpec(String dataSource, Str
null,
DataSchema.builder()
.withDataSource(dataSource)
.withTimestamp(new TimestampSpec("timestamp", null, null))
.withTimestamp(TimestampSpec.DEFAULT)
.withGranularity(new UniformGranularitySpec(Granularities.HOUR, null, null))
.withDimensions(DimensionsSpec.EMPTY)
.build(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ public void test_postSupervisor_fails_ifRequiredExtensionIsNotLoaded()
{
final KafkaSupervisorSpec kafkaSupervisor = MoreResources.Supervisor.KAFKA_JSON
.get()
.withDataSchema(schema -> schema.withTimestamp(new TimestampSpec(null, null, null)))
.withDataSchema(schema -> schema.withTimestamp(TimestampSpec.DEFAULT))
.withIoConfig(ioConfig -> ioConfig.withConsumerProperties(Map.of("bootstrap.servers", "localhost")))
.build(dataSource, "topic");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ public ThriftInputFormat(
{
super(flattenSpec);
this.thriftJar = thriftJar;
InvalidInput.conditionalException(thriftClass != null, "thriftClass must not be null");
this.thriftClass = thriftClass;
this.thriftClass = InvalidInput.notNull(thriftClass, "thriftClass");
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public void testParse() throws Exception
public void testDisableJavaScript()
{
final JavaScriptParseSpec parseSpec = new JavaScriptParseSpec(
new TimestampSpec("timestamp", "auto", null),
TimestampSpec.DEFAULT,
new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(
ImmutableList.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,12 +192,11 @@ private InputEntityReader createReader(
{
final GenericRecord someAvroDatum = AvroStreamInputFormatTest.buildSomeAvroDatum();
final File someAvroFile = AvroStreamInputFormatTest.createAvroFile(someAvroDatum);
final TimestampSpec timestampSpec = new TimestampSpec("timestamp", "auto", null);
final DimensionsSpec dimensionsSpec = new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of(
"eventType")));

final AvroOCFInputFormat inputFormat = new AvroOCFInputFormat(mapper, null, readerSchema, null, null);
final InputRowSchema schema = new InputRowSchema(timestampSpec, dimensionsSpec, ColumnsFilter.all());
final InputRowSchema schema = new InputRowSchema(TimestampSpec.DEFAULT, dimensionsSpec, ColumnsFilter.all());
final FileEntity entity = new FileEntity(someAvroFile);
return inputFormat.createReader(schema, entity, temporaryFolder.newFolder());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public class HdfsInputSourceTest extends InitializedNullHandlingTest
private static final HdfsInputSourceConfig DEFAULT_INPUT_SOURCE_CONFIG = new HdfsInputSourceConfig(null);
private static final String COLUMN = "value";
private static final InputRowSchema INPUT_ROW_SCHEMA = new InputRowSchema(
new TimestampSpec(null, null, null),
TimestampSpec.DEFAULT,
DimensionsSpec.EMPTY,
ColumnsFilter.all()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ public void testIngestNullColumnAfterDataInserted() throws Exception
);
final KafkaIndexTask task = createTask(
null,
NEW_DATA_SCHEMA.withDimensionsSpec(dimensionsSpec),
DATA_SCHEMA.withDimensionsSpec(dimensionsSpec),
new KafkaIndexTaskIOConfig(
0,
"sequence0",
Expand Down Expand Up @@ -463,7 +463,7 @@ public void testIngestNullColumnAfterDataInserted_storeEmptyColumnsOff_shouldNot

final KafkaIndexTask task = createTask(
null,
NEW_DATA_SCHEMA.withDimensionsSpec(
DATA_SCHEMA.withDimensionsSpec(
new DimensionsSpec(
ImmutableList.of(
new StringDimensionSchema("dim1"),
Expand Down Expand Up @@ -667,7 +667,7 @@ public void testIncrementalHandOff() throws Exception
Assert.assertTrue(
checkpointRequestsHash.contains(
Objects.hash(
NEW_DATA_SCHEMA.getDataSource(),
DATA_SCHEMA.getDataSource(),
0,
new KafkaDataSourceMetadata(startPartitions)
)
Expand Down Expand Up @@ -797,7 +797,7 @@ public void testIncrementalHandOffMaxTotalRows() throws Exception
Assert.assertTrue(
checkpointRequestsHash.contains(
Objects.hash(
NEW_DATA_SCHEMA.getDataSource(),
DATA_SCHEMA.getDataSource(),
0,
new KafkaDataSourceMetadata(startPartitions)
)
Expand All @@ -806,7 +806,7 @@ public void testIncrementalHandOffMaxTotalRows() throws Exception
Assert.assertTrue(
checkpointRequestsHash.contains(
Objects.hash(
NEW_DATA_SCHEMA.getDataSource(),
DATA_SCHEMA.getDataSource(),
0,
new KafkaDataSourceMetadata(
new SeekableStreamStartSequenceNumbers<>(topic, currentOffsets, ImmutableSet.of())
Expand Down Expand Up @@ -905,7 +905,7 @@ public void testTimeBasedIncrementalHandOff() throws Exception
Assert.assertTrue(
checkpointRequestsHash.contains(
Objects.hash(
NEW_DATA_SCHEMA.getDataSource(),
DATA_SCHEMA.getDataSource(),
0,
new KafkaDataSourceMetadata(startPartitions)
)
Expand Down Expand Up @@ -990,7 +990,7 @@ public void testCheckpointResetWithSameEndOffsets() throws Exception

DataSourceMetadata newDataSchemaMetadata()
{
return metadataStorageCoordinator.retrieveDataSourceMetadata(NEW_DATA_SCHEMA.getDataSource());
return metadataStorageCoordinator.retrieveDataSourceMetadata(DATA_SCHEMA.getDataSource());
}

@Test(timeout = 60_000L)
Expand Down Expand Up @@ -1201,7 +1201,7 @@ public void testRunWithTransformSpec() throws Exception
{
final KafkaIndexTask task = createTask(
null,
NEW_DATA_SCHEMA.withTransformSpec(
DATA_SCHEMA.withTransformSpec(
new TransformSpec(
new SelectorDimFilter("dim1", "b", null),
ImmutableList.of(
Expand Down Expand Up @@ -2714,7 +2714,7 @@ public void testSerde() throws Exception

final KafkaIndexTask task = createTask(
"taskid",
NEW_DATA_SCHEMA.withTransformSpec(
DATA_SCHEMA.withTransformSpec(
new TransformSpec(
null,
ImmutableList.of(new ExpressionTransform("beep", "nofunc()", ExprMacroTable.nil()))
Expand Down Expand Up @@ -2747,7 +2747,7 @@ public void testCorrectInputSources() throws Exception

final KafkaIndexTask task = createTask(
"taskid",
NEW_DATA_SCHEMA.withTransformSpec(
DATA_SCHEMA.withTransformSpec(
new TransformSpec(
null,
ImmutableList.of(new ExpressionTransform("beep", "nofunc()", ExprMacroTable.nil()))
Expand Down Expand Up @@ -2809,7 +2809,7 @@ private void awaitConsumedOffsets(final KafkaIndexTask task, final Map<KafkaTopi

private List<Map<String, Object>> scanData(final Task task, QuerySegmentSpec spec)
{
ScanQuery query = new Druids.ScanQueryBuilder().dataSource(NEW_DATA_SCHEMA.getDataSource())
ScanQuery query = new Druids.ScanQueryBuilder().dataSource(DATA_SCHEMA.getDataSource())
.intervals(spec)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_LIST)
.build();
Expand Down Expand Up @@ -2866,7 +2866,7 @@ private KafkaIndexTask createTask(
final KafkaIndexTaskIOConfig ioConfig
) throws JsonProcessingException
{
return createTask(taskId, NEW_DATA_SCHEMA, ioConfig);
return createTask(taskId, DATA_SCHEMA, ioConfig);
}

private KafkaIndexTask createTask(
Expand All @@ -2875,7 +2875,7 @@ private KafkaIndexTask createTask(
final Map<String, Object> context
) throws JsonProcessingException
{
return createTask(taskId, NEW_DATA_SCHEMA, ioConfig, context);
return createTask(taskId, DATA_SCHEMA, ioConfig, context);
}

private KafkaIndexTask createTask(
Expand Down Expand Up @@ -2945,7 +2945,7 @@ private KafkaIndexTask createTask(

private static DataSchema cloneDataSchema(final DataSchema dataSchema)
{
return DataSchema.builder(dataSchema).withObjectMapper(OBJECT_MAPPER).build();
return DataSchema.builder(dataSchema).build();
}

@Override
Expand Down Expand Up @@ -3374,7 +3374,7 @@ public void testTaskWithTransformSpecDoesNotCauseCliPeonCyclicDependency()

final KafkaIndexTask task = createTask(
"index_kafka_test_id1",
NEW_DATA_SCHEMA.withTransformSpec(
DATA_SCHEMA.withTransformSpec(
new TransformSpec(
new SelectorDimFilter("dim1", "b", null),
ImmutableList.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@ private static byte[] jb(String timestamp, String dim1, String dim2, String dimL
public void testInvalidKafkaConfig()
{
KafkaSupervisorSpec supervisorSpec = new KafkaSupervisorSpecBuilder()
.withDataSchema(DATA_SCHEMA)
.withIoConfig(
ioConfig -> ioConfig
.withJsonInputFormat()
Expand All @@ -388,6 +389,7 @@ public void testInvalidKafkaConfig()
public void testGetInputSourceResources()
{
KafkaSupervisorSpec supervisorSpec = new KafkaSupervisorSpecBuilder()
.withDataSchema(DATA_SCHEMA)
.withIoConfig(
ioConfig -> ioConfig
.withJsonInputFormat()
Expand Down
Loading
Loading