From f3b58bdeb658d25e387f7a0cca73e1246c0e84fe Mon Sep 17 00:00:00 2001 From: Laxman Ch <60599147+laxmanchekka@users.noreply.github.com> Date: Fri, 29 Sep 2023 18:53:15 +0530 Subject: [PATCH] use appropriate topology names (#80) --- view-generator-framework/build.gradle.kts | 2 +- .../viewgenerator/service/ViewGeneratorLauncher.java | 11 ++++++++--- .../viewgenerator/MultiViewGeneratorLauncherTest.java | 10 +++++++++- .../src/test/resources/log4j2.properties | 8 ++++++++ 4 files changed, 26 insertions(+), 5 deletions(-) create mode 100644 view-generator-framework/src/test/resources/log4j2.properties diff --git a/view-generator-framework/build.gradle.kts b/view-generator-framework/build.gradle.kts index fd2f492..cd8e7de 100644 --- a/view-generator-framework/build.gradle.kts +++ b/view-generator-framework/build.gradle.kts @@ -25,7 +25,7 @@ dependencies { testImplementation("org.junit-pioneer:junit-pioneer:2.0.0") testImplementation("org.mockito:mockito-core:5.2.0") testImplementation("org.apache.kafka:kafka-streams-test-utils:7.2.1-ccs") - testRuntimeOnly("org.apache.logging.log4j:log4j-slf4j-impl:2.20.0") + testRuntimeOnly("org.apache.logging.log4j:log4j-slf4j2-impl:2.20.0") } // Disabling compatibility check for the test avro definitions. diff --git a/view-generator-framework/src/main/java/org/hypertrace/core/viewgenerator/service/ViewGeneratorLauncher.java b/view-generator-framework/src/main/java/org/hypertrace/core/viewgenerator/service/ViewGeneratorLauncher.java index 08662b2..98e768e 100644 --- a/view-generator-framework/src/main/java/org/hypertrace/core/viewgenerator/service/ViewGeneratorLauncher.java +++ b/view-generator-framework/src/main/java/org/hypertrace/core/viewgenerator/service/ViewGeneratorLauncher.java @@ -50,7 +50,7 @@ public StreamsBuilder buildTopology( String outputTopic = jobConfig.getString(OUTPUT_TOPIC_CONFIG_KEY); KStream mergedStream = null; - + int mergedStreamId = 0; for (String topic : inputTopics) { KStream inputStream = (KStream) inputStreams.get(topic); @@ -64,7 +64,10 @@ public StreamsBuilder buildTopology( if (mergedStream == null) { mergedStream = inputStream; } else { - mergedStream = mergedStream.merge(inputStream, Named.as("merged-stream")); + mergedStream = + mergedStream.merge( + inputStream, Named.as("merged-stream-" + getViewGenName() + "-" + mergedStreamId)); + mergedStreamId++; } } @@ -83,7 +86,9 @@ public StreamsBuilder buildTopology( } mergedStream - .process(() -> new ViewGenerationProcessor(getJobConfigKey())) + .process( + () -> new ViewGenerationProcessor(getJobConfigKey()), + Named.as("processor-" + getViewGenName())) .to( outputTopic, Produced.with( diff --git a/view-generator-framework/src/test/java/org/hypertrace/core/viewgenerator/MultiViewGeneratorLauncherTest.java b/view-generator-framework/src/test/java/org/hypertrace/core/viewgenerator/MultiViewGeneratorLauncherTest.java index 17fc355..6ec9321 100644 --- a/view-generator-framework/src/test/java/org/hypertrace/core/viewgenerator/MultiViewGeneratorLauncherTest.java +++ b/view-generator-framework/src/test/java/org/hypertrace/core/viewgenerator/MultiViewGeneratorLauncherTest.java @@ -19,6 +19,7 @@ import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.TestInputTopic; import org.apache.kafka.streams.TestOutputTopic; +import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.TopologyTestDriver; import org.hypertrace.core.kafkastreams.framework.serdes.AvroSerde; import org.hypertrace.core.serviceframework.config.ConfigClientFactory; @@ -29,8 +30,12 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junitpioneer.jupiter.SetEnvironmentVariable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class MultiViewGeneratorLauncherTest { + private static final Logger logger = + LoggerFactory.getLogger(MultiViewGeneratorLauncherTest.class); private MultiViewGeneratorLauncher underTest; private List> inputTopics = new ArrayList<>(); private TestOutputTopic spanTypeTwoOutputTopic; @@ -54,10 +59,11 @@ public void setUp() { StreamsBuilder streamsBuilder = underTest.buildTopology(mergedProps, new StreamsBuilder(), new HashMap<>()); + Topology testTopology = streamsBuilder.build(); Properties props = new Properties(); mergedProps.forEach(props::put); - TopologyTestDriver td = new TopologyTestDriver(streamsBuilder.build(), props); + TopologyTestDriver td = new TopologyTestDriver(testTopology, props); Serde spanTypeOneSerde = new AvroSerde<>(); spanTypeOneSerde.configure(Map.of(), false); @@ -87,6 +93,8 @@ public void setUp() { "test-raw-service-type-output-topic", new StringSerde().deserializer(), rawServiceTypeSerde.deserializer()); + + logger.info("test topology: {}", testTopology.describe()); } @Test diff --git a/view-generator-framework/src/test/resources/log4j2.properties b/view-generator-framework/src/test/resources/log4j2.properties new file mode 100644 index 0000000..62c371c --- /dev/null +++ b/view-generator-framework/src/test/resources/log4j2.properties @@ -0,0 +1,8 @@ +status=error +name=PropertiesConfig +appender.console.type=Console +appender.console.name=STDOUT +appender.console.layout.type=PatternLayout +appender.console.layout.pattern=%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %c{1.} - %msg%n +rootLogger.level=INFO +rootLogger.appenderRef.stdout.ref=STDOUT