Skip to content
This repository has been archived by the owner on Nov 12, 2024. It is now read-only.

Commit

Permalink
use appropriate topology names (#80)
Browse files Browse the repository at this point in the history
  • Loading branch information
laxmanchekka authored Sep 29, 2023
1 parent e52e516 commit f3b58bd
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 5 deletions.
2 changes: 1 addition & 1 deletion view-generator-framework/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ dependencies {
testImplementation("org.junit-pioneer:junit-pioneer:2.0.0")
testImplementation("org.mockito:mockito-core:5.2.0")
testImplementation("org.apache.kafka:kafka-streams-test-utils:7.2.1-ccs")
testRuntimeOnly("org.apache.logging.log4j:log4j-slf4j-impl:2.20.0")
testRuntimeOnly("org.apache.logging.log4j:log4j-slf4j2-impl:2.20.0")
}

// Disabling compatibility check for the test avro definitions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public StreamsBuilder buildTopology(
String outputTopic = jobConfig.getString(OUTPUT_TOPIC_CONFIG_KEY);

KStream<String, Object> mergedStream = null;

int mergedStreamId = 0;
for (String topic : inputTopics) {
KStream<String, Object> inputStream = (KStream<String, Object>) inputStreams.get(topic);

Expand All @@ -64,7 +64,10 @@ public StreamsBuilder buildTopology(
if (mergedStream == null) {
mergedStream = inputStream;
} else {
mergedStream = mergedStream.merge(inputStream, Named.as("merged-stream"));
mergedStream =
mergedStream.merge(
inputStream, Named.as("merged-stream-" + getViewGenName() + "-" + mergedStreamId));
mergedStreamId++;
}
}

Expand All @@ -83,7 +86,9 @@ public StreamsBuilder buildTopology(
}

mergedStream
.process(() -> new ViewGenerationProcessor(getJobConfigKey()))
.process(
() -> new ViewGenerationProcessor(getJobConfigKey()),
Named.as("processor-" + getViewGenName()))
.to(
outputTopic,
Produced.with(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.hypertrace.core.kafkastreams.framework.serdes.AvroSerde;
import org.hypertrace.core.serviceframework.config.ConfigClientFactory;
Expand All @@ -29,8 +30,12 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junitpioneer.jupiter.SetEnvironmentVariable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MultiViewGeneratorLauncherTest {
private static final Logger logger =
LoggerFactory.getLogger(MultiViewGeneratorLauncherTest.class);
private MultiViewGeneratorLauncher underTest;
private List<TestInputTopic<String, SpanTypeOne>> inputTopics = new ArrayList<>();
private TestOutputTopic<String, SpanTypeTwo> spanTypeTwoOutputTopic;
Expand All @@ -54,10 +59,11 @@ public void setUp() {
StreamsBuilder streamsBuilder =
underTest.buildTopology(mergedProps, new StreamsBuilder(), new HashMap<>());

Topology testTopology = streamsBuilder.build();
Properties props = new Properties();
mergedProps.forEach(props::put);

TopologyTestDriver td = new TopologyTestDriver(streamsBuilder.build(), props);
TopologyTestDriver td = new TopologyTestDriver(testTopology, props);

Serde<SpanTypeOne> spanTypeOneSerde = new AvroSerde<>();
spanTypeOneSerde.configure(Map.of(), false);
Expand Down Expand Up @@ -87,6 +93,8 @@ public void setUp() {
"test-raw-service-type-output-topic",
new StringSerde().deserializer(),
rawServiceTypeSerde.deserializer());

logger.info("test topology: {}", testTopology.describe());
}

@Test
Expand Down
8 changes: 8 additions & 0 deletions view-generator-framework/src/test/resources/log4j2.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
status=error
name=PropertiesConfig
appender.console.type=Console
appender.console.name=STDOUT
appender.console.layout.type=PatternLayout
appender.console.layout.pattern=%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %c{1.} - %msg%n
rootLogger.level=INFO
rootLogger.appenderRef.stdout.ref=STDOUT

0 comments on commit f3b58bd

Please sign in to comment.