diff --git a/.github/workflows/check-samples.yml b/.github/workflows/check-samples.yml new file mode 100644 index 00000000..b3a7c927 --- /dev/null +++ b/.github/workflows/check-samples.yml @@ -0,0 +1,57 @@ +name: Check Samples + +on: + workflow_dispatch: + +env: + SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }} + GRADLE_ENTERPRISE_CACHE_USER: ${{ secrets.GRADLE_ENTERPRISE_CACHE_USER }} + GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GRADLE_ENTERPRISE_CACHE_PASSWORD }} + GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GRADLE_ENTERPRISE_SECRET_ACCESS_KEY }} + COMMIT_OWNER: ${{ github.event.pusher.name }} + COMMIT_SHA: ${{ github.sha }} + ARTIFACTORY_USERNAME: ${{ secrets.ARTIFACTORY_USERNAME }} + ARTIFACTORY_PASSWORD: ${{ secrets.ARTIFACTORY_PASSWORD }} + +jobs: + prerequisites: + name: Pre-requisites for building + runs-on: ubuntu-latest + if: github.repository == 'spring-projects/spring-pulsar' + outputs: + runjobs: ${{ steps.continue.outputs.runjobs }} + project_version: ${{ steps.continue.outputs.project_version }} + steps: + - uses: actions/checkout@v3 + - id: continue + name: Determine if should continue + run: | + # Run jobs if in upstream repository + echo "runjobs=true" >>$GITHUB_OUTPUT + # Extract version from gradle.properties + version=$(cat gradle.properties | grep "version=" | awk -F'=' '{print $2}') + echo "project_version=$version" >>$GITHUB_OUTPUT + check_samples: + name: Check Samples project + needs: [prerequisites] + runs-on: ubuntu-latest + if: needs.prerequisites.outputs.runjobs + steps: + - uses: actions/checkout@v3 + - name: Set up gradle + uses: spring-io/spring-gradle-build-action@v2 + with: + java-version: 17 + distribution: temurin + - name: Check samples project + env: + LOCAL_REPOSITORY_PATH: ${{ github.workspace }}/build/publications/repos + SAMPLES_DIR: + VERSION: ${{ needs.prerequisites.outputs.project_version }} + run: | + ./gradlew publishMavenJavaPublicationToLocalRepository + ./gradlew \ + --init-script ./spring-pulsar-sample-apps/sample-apps-check-ci.gradle \ + -PlocalRepositoryPath="$LOCAL_REPOSITORY_PATH" \ + -PspringPulsarVersion="$VERSION" \ + :runAllSampleTests diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index da95c9c5..cf3dd4f6 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -99,6 +99,30 @@ jobs: context-root: spring-pulsar cloudflare-zone-id: ${{ secrets.CLOUDFLARE_ZONE_ID }} cloudflare-cache-token: ${{ secrets.CLOUDFLARE_CACHE_TOKEN }} + check_samples: + name: Check Samples project + needs: [prerequisites] + runs-on: ubuntu-latest + if: needs.prerequisites.outputs.runjobs + steps: + - uses: actions/checkout@v3 + - name: Set up gradle + uses: spring-io/spring-gradle-build-action@v2 + with: + java-version: 17 + distribution: temurin + - name: Check samples project + env: + LOCAL_REPOSITORY_PATH: ${{ github.workspace }}/build/publications/repos + SAMPLES_DIR: + VERSION: ${{ needs.prerequisites.outputs.project_version }} + run: | + ./gradlew publishMavenJavaPublicationToLocalRepository + ./gradlew \ + --init-script ./spring-pulsar-sample-apps/sample-apps-check-ci.gradle \ + -PlocalRepositoryPath="$LOCAL_REPOSITORY_PATH" \ + -PspringPulsarVersion="$VERSION" \ + :runAllSampleTests perform_release: name: Perform Release needs: [prerequisites, build_deploy_jdk_17, deploy_docs_antora] diff --git a/build.gradle b/build.gradle index 1f854018..6195e16c 100644 --- a/build.gradle +++ b/build.gradle @@ -42,3 +42,14 @@ nohttp { check { dependsOn checkstyleNohttp } + +tasks.register('runAllSampleTests') { + var allTasks = rootProject.getAllTasks(true) + .findAll {it.key.project.name.startsWith("sample-") } + var allTestsTasks = allTasks.values().collect { t -> + t.findAll { it.name == 'test' || it.name == 'integrationTest' } + }.flatten() + it.dependsOn { + allTestsTasks + } +} diff --git a/buildSrc/src/main/java/org/springframework/pulsar/gradle/check/JacocoConventionsPlugin.java b/buildSrc/src/main/java/org/springframework/pulsar/gradle/check/JacocoConventionsPlugin.java index 8808da51..21acbb8e 100644 --- a/buildSrc/src/main/java/org/springframework/pulsar/gradle/check/JacocoConventionsPlugin.java +++ b/buildSrc/src/main/java/org/springframework/pulsar/gradle/check/JacocoConventionsPlugin.java @@ -20,7 +20,7 @@ public void apply(final Project project) { project.getPlugins().withType(JavaPlugin.class, (javaPlugin) -> { project.getPluginManager().apply(JacocoPlugin.class); project.getExtensions().configure(JacocoPluginExtension.class, - (jacocoExtension) -> jacocoExtension.setToolVersion("0.8.7")); + (jacocoExtension) -> jacocoExtension.setToolVersion("0.8.9")); project.getTasks().withType(Test.class, (test) -> project.getTasks().withType(JacocoReport.class, test::finalizedBy)); }); diff --git a/gradle/aggregate-jacoco-report.gradle b/gradle/aggregate-jacoco-report.gradle index 3b7798ca..87c46149 100644 --- a/gradle/aggregate-jacoco-report.gradle +++ b/gradle/aggregate-jacoco-report.gradle @@ -1,7 +1,7 @@ apply plugin: 'jacoco' jacoco { - toolVersion '0.8.7' + toolVersion '0.8.9' } tasks.withType(Test) { @@ -26,8 +26,9 @@ project.afterEvaluate { def modulesToAggregate = subprojects.findAll { proj -> def isSampleApp = proj.getParent().getPath().contains('spring-pulsar-sample-apps') + def isTestModule = proj.getName() == 'spring-pulsar-test' def path = "${proj.projectDir}/src/main/java" - !(isSampleApp) && (new File(path)).exists() + !(isSampleApp) && !(isTestModule) && (new File(path)).exists() } def classes = modulesToAggregate.collect { diff --git a/settings.gradle b/settings.gradle index c988a81f..d0e8431f 100644 --- a/settings.gradle +++ b/settings.gradle @@ -17,7 +17,7 @@ include 'spring-pulsar-cache-provider' include 'spring-pulsar-cache-provider-caffeine' include 'spring-pulsar-reactive' include 'spring-pulsar-dependencies' -include 'spring-pulsar-sample-apps:sample-app1' +include 'spring-pulsar-sample-apps:sample-imperative-produce-consume' include 'spring-pulsar-sample-apps:sample-app2' include 'spring-pulsar-sample-apps:sample-pulsar-functions:sample-signup-app' include 'spring-pulsar-sample-apps:sample-pulsar-functions:sample-signup-function' diff --git a/spring-pulsar-sample-apps/sample-app1/src/main/java/app1/SpringPulsarBootApp.java b/spring-pulsar-sample-apps/sample-app1/src/main/java/app1/SpringPulsarBootApp.java deleted file mode 100644 index 28165558..00000000 --- a/spring-pulsar-sample-apps/sample-app1/src/main/java/app1/SpringPulsarBootApp.java +++ /dev/null @@ -1,205 +0,0 @@ -/* - * Copyright 2022-2023 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package app1; - -import java.util.List; -import java.util.UUID; - -import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.common.schema.SchemaType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.springframework.boot.ApplicationRunner; -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.pulsar.annotation.PulsarListener; -import org.springframework.pulsar.core.DefaultSchemaResolver; -import org.springframework.pulsar.core.PulsarProducerFactory; -import org.springframework.pulsar.core.PulsarTemplate; -import org.springframework.pulsar.core.PulsarTopic; -import org.springframework.pulsar.core.SchemaResolver.SchemaResolverCustomizer; - -@SpringBootApplication -public class SpringPulsarBootApp { - - private static final Logger logger = LoggerFactory.getLogger(SpringPulsarBootApp.class); - - public static void main(String[] args) { - SpringApplication.run(SpringPulsarBootApp.class, args); - } - - /* - * Basic publisher using PulsarTemplate and a PulsarListener using an - * exclusive subscription to consume. - */ - @Bean - ApplicationRunner runner1(PulsarTemplate pulsarTemplate) { - - String topic1 = "hello-pulsar-exclusive-1"; - - return args -> { - for (int i = 0; i < 10; i++) { - pulsarTemplate.send(topic1, "This is message " + (i + 1)); - } - }; - } - - @PulsarListener(subscriptionName = "subscription-1", topics = "hello-pulsar-exclusive-1") - void listen1(String message) { - logger.info(message); - } - - /* - * Basic publisher using PulsarTemplate and a PulsarListener using an - * exclusive subscription to consume. - */ - @Bean - ApplicationRunner runner2(PulsarTemplate pulsarTemplate) { - - String topic1 = "hello-pulsar-exclusive-2"; - - return args -> { - for (int i = 0; i < 10; i++) { - pulsarTemplate.send(topic1, i); - } - }; - } - - @PulsarListener(subscriptionName = "subscription-2", topics = "hello-pulsar-exclusive-2") - void listen2(Integer message) { - logger.info("Message received :" + message); - } - - /* - * Demonstrating more complex types for publishing using JSON schema and the - * associated PulsarListener using an exclusive subscription. - */ - @Bean - ApplicationRunner runner3(PulsarProducerFactory producerFactory) { - - String topic = "hello-pulsar-exclusive-3"; - PulsarTemplate pulsarTemplate = new PulsarTemplate<>(producerFactory); - return args -> { - for (int i = 0; i < 10; i++) { - Foo foo = new Foo(i + "-" + "Foo-" + UUID.randomUUID(), i + "-" + "Bar-" + UUID.randomUUID()); - pulsarTemplate.send(topic, foo, Schema.JSON(Foo.class)); - } - }; - } - - @PulsarListener(subscriptionName = "subscription-3", topics = "hello-pulsar-exclusive-3", - schemaType = SchemaType.JSON) - void listen3(Foo message) { - logger.info("Message received :" + message); - } - - /* - * Create a partitioned topic using PulsarAdministration and then publish to the topic - * and consume from it. - */ - @Bean - PulsarTopic partitionedTopic4() { - return PulsarTopic.builder("hello-pulsar-partitioned-4").numberOfPartitions(3).build(); - } - - @Bean - ApplicationRunner runner4(PulsarTemplate pulsarTemplate) { - return args -> { - for (int i = 0; i < 10; i++) { - pulsarTemplate.send("hello-pulsar-partitioned-4", "This is message " + (i + 1)); - } - }; - } - - @PulsarListener(subscriptionName = "subscription-4", topics = "hello-pulsar-partitioned-4") - void listen4(String message) { - logger.info("Message received from partitioned-topic : " + message); - } - - /* - * Publish and then use PulsarListener in batch listening mode. - */ - @Bean - ApplicationRunner runner5(PulsarProducerFactory producerFactory) { - - String topic = "hello-pulsar-exclusive-5"; - PulsarTemplate pulsarTemplate = new PulsarTemplate<>(producerFactory); - return args -> { - for (int i = 0; i < 100; i++) { - Foo foo = new Foo(i + "-" + "Foo-" + UUID.randomUUID(), i + "-" + "Bar-" + UUID.randomUUID()); - pulsarTemplate.send(topic, foo, Schema.JSON(Foo.class)); - } - }; - } - - @PulsarListener(subscriptionName = "subscription-5", topics = "hello-pulsar-exclusive-5", - schemaType = SchemaType.JSON, batch = true) - void listen5(List messages) { - logger.info("records received :" + messages.size()); - for (Foo message : messages) { - logger.info("record : " + message); - } - } - - record Foo(String foo, String bar) { - } - - @Configuration(proxyBeanMethods = false) - static class CustomSchemaAndTopicMappingConfig { - - // The topic mappings are in test/resources/application.yml - - @Bean - SchemaResolverCustomizer schemaMappingsCustomizer() { - return (schemaResolver) -> { - schemaResolver.addCustomSchemaMapping(Bar.class, Schema.JSON(Bar.class)); - schemaResolver.addCustomSchemaMapping(Zaa.class, Schema.JSON(Zaa.class)); - }; - } - - @Bean - ApplicationRunner sendBarWithoutTopicOrSchema(PulsarTemplate template) { - return args -> template.send(new Bar("hello bar")); - } - - @Bean - ApplicationRunner sendZaaWithoutTopicOrSchema(PulsarTemplate template) { - return args -> template.send(new Zaa("hello zaa")); - } - - @PulsarListener - void listenBarWithoutTopicOrSchema(Bar message) { - logger.info("BAR LISTENER -> " + message); - } - - @PulsarListener - void listenZaaWithoutTopicOrSchema(Zaa message) { - logger.info("ZAA LISTENER -> " + message); - } - - public record Bar(String value) { - } - - public record Zaa(String value) { - } - - } - -} diff --git a/spring-pulsar-sample-apps/sample-app1/src/main/resources/application.yml b/spring-pulsar-sample-apps/sample-app1/src/main/resources/application.yml deleted file mode 100644 index c5c45f5e..00000000 --- a/spring-pulsar-sample-apps/sample-app1/src/main/resources/application.yml +++ /dev/null @@ -1,8 +0,0 @@ -spring: - pulsar: - defaults: - type-mappings: - - message-type: app1.SpringPulsarBootApp$CustomSchemaAndTopicMappingConfig$Bar - topic-name: bar-topic - - message-type: app1.SpringPulsarBootApp$CustomSchemaAndTopicMappingConfig$Zaa - topic-name: zaa-topic diff --git a/spring-pulsar-sample-apps/sample-apps-check-ci.gradle b/spring-pulsar-sample-apps/sample-apps-check-ci.gradle new file mode 100644 index 00000000..29a21a9c --- /dev/null +++ b/spring-pulsar-sample-apps/sample-apps-check-ci.gradle @@ -0,0 +1,27 @@ +allprojects { + configurations.all { + resolutionStrategy { + force "org.springframework.pulsar:spring-pulsar:$springPulsarVersion" + force "org.springframework.pulsar:spring-pulsar-cache-provider:$springPulsarVersion" + force "org.springframework.pulsar:spring-pulsar-cache-provider-caffeine:$springPulsarVersion" + force "org.springframework.pulsar:spring-pulsar-reactive:$springPulsarVersion" + force "org.springframework.pulsar:spring-pulsar-dependencies:$springPulsarVersion" + } + } + repositories { + exclusiveContent { + forRepository { + maven { + url "file://$localRepositoryPath" + } + } + filter { + includeVersion "org.springframework.pulsar", "spring-pulsar", "$springPulsarVersion" + includeVersion "org.springframework.pulsar", "spring-pulsar-cache-provider", "$springPulsarVersion" + includeVersion "org.springframework.pulsar", "spring-pulsar-cache-provider-caffeine", "$springPulsarVersion" + includeVersion "org.springframework.pulsar", "spring-pulsar-reactive", "$springPulsarVersion" + includeVersion "org.springframework.pulsar", "spring-pulsar-dependencies", "$springPulsarVersion" + } + } + } +} diff --git a/spring-pulsar-sample-apps/sample-app1/build.gradle b/spring-pulsar-sample-apps/sample-imperative-produce-consume/build.gradle similarity index 56% rename from spring-pulsar-sample-apps/sample-app1/build.gradle rename to spring-pulsar-sample-apps/sample-imperative-produce-consume/build.gradle index 0beffc18..e3170a32 100644 --- a/spring-pulsar-sample-apps/sample-app1/build.gradle +++ b/spring-pulsar-sample-apps/sample-imperative-produce-consume/build.gradle @@ -4,7 +4,7 @@ plugins { id 'io.spring.dependency-management' version '1.1.0' } -description = 'Spring Pulsar Sample Application (Send and Receive)' +description = 'Spring Pulsar Sample (Imperative Produce and Consume)' repositories { mavenCentral() @@ -18,6 +18,20 @@ dependencies { implementation 'io.micrometer:micrometer-tracing-bridge-brave' implementation 'io.zipkin.reporter2:zipkin-reporter-brave' implementation 'io.zipkin.reporter2:zipkin-sender-urlconnection' + + testImplementation project(':spring-pulsar-test') + testRuntimeOnly 'ch.qos.logback:logback-classic' + testRuntimeOnly 'org.junit.platform:junit-platform-launcher' + testImplementation 'org.awaitility:awaitility' + testImplementation "org.springframework.boot:spring-boot-starter-test:${springBootVersion}" + testImplementation "org.springframework.boot:spring-boot-testcontainers:${springBootVersion}" + testImplementation 'org.testcontainers:junit-jupiter' + testImplementation 'org.testcontainers:pulsar' +} + +test { + useJUnitPlatform() + testLogging.showStandardStreams = true } bootRun { diff --git a/spring-pulsar-sample-apps/sample-imperative-produce-consume/src/main/java/com/example/ImperativeProduceAndConsumeApp.java b/spring-pulsar-sample-apps/sample-imperative-produce-consume/src/main/java/com/example/ImperativeProduceAndConsumeApp.java new file mode 100644 index 00000000..6938fe8b --- /dev/null +++ b/spring-pulsar-sample-apps/sample-imperative-produce-consume/src/main/java/com/example/ImperativeProduceAndConsumeApp.java @@ -0,0 +1,170 @@ +/* + * Copyright 2022-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example; + +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.springframework.boot.ApplicationRunner; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.pulsar.annotation.PulsarListener; +import org.springframework.pulsar.core.PulsarTemplate; +import org.springframework.pulsar.core.PulsarTopic; + +@SpringBootApplication +public class ImperativeProduceAndConsumeApp { + + private static final Logger LOG = LoggerFactory.getLogger(ImperativeProduceAndConsumeApp.class); + + public static void main(String[] args) { + SpringApplication.run(ImperativeProduceAndConsumeApp.class, args); + } + + @Configuration(proxyBeanMethods = false) + static class ProduceConsumeWithPrimitiveMessageType { + + private static final String TOPIC = "produce-consume-primitive"; + + @Bean + ApplicationRunner sendPrimitiveMessagesToPulsarTopic(PulsarTemplate template) { + return (args) -> { + for (int i = 0; i < 10; i++) { + var msg = "ProduceConsumeWithPrimitiveMessageType:" + i; + template.send(TOPIC, msg); + LOG.info("++++++PRODUCE {}------", msg); + } + }; + } + + @PulsarListener(topics = TOPIC, subscriptionName = TOPIC+"-sub") + void consumePrimitiveMessagesFromPulsarTopic(String msg) { + LOG.info("++++++CONSUME {}------", msg); + } + + } + + + @Configuration(proxyBeanMethods = false) + static class ProduceConsumeWithComplexMessageType { + + private static final String TOPIC = "produce-consume-complex"; + + @Bean + ApplicationRunner sendComplexMessagesToPulsarTopic(PulsarTemplate template) { + return (args) -> { + for (int i = 0; i < 10; i++) { + var msg = new Foo("ProduceConsumeWithComplexMessageType", i); + template.send(TOPIC, msg); + LOG.info("++++++PRODUCE {}------", msg); + } + }; + } + + @PulsarListener(topics = TOPIC, subscriptionName = TOPIC+"-sub") + void consumeComplexMessagesFromPulsarTopic(Foo msg) { + LOG.info("++++++CONSUME {}------", msg); + } + + } + + + @Configuration(proxyBeanMethods = false) + static class ProduceConsumeWithPartitions { + + private static final String TOPIC = "produce-consume-partitions"; + + @Bean + PulsarTopic partitionedTopic() { + return PulsarTopic.builder(TOPIC).numberOfPartitions(3).build(); + } + + @Bean + ApplicationRunner sendPartitionedMessagesToPulsarTopic(PulsarTemplate template) { + return (args) -> { + for (int i = 0; i < 10; i++) { + var msg = "ProduceConsumeWithPartitions:" + i; + template.send(TOPIC, msg); + LOG.info("++++++PRODUCE {}------", msg); + } + }; + } + + @PulsarListener(topics = TOPIC, subscriptionName = TOPIC+"-sub") + void consumePartitionedMessagesFromPulsarTopic(String msg) { + LOG.info("++++++CONSUME {}------", msg); + } + + } + + + @Configuration(proxyBeanMethods = false) + static class ProduceConsumeBatchListener { + + private static final String TOPIC = "produce-consume-batch"; + + @Bean + ApplicationRunner sendBatchMessagesToPulsarTopic(PulsarTemplate template) { + return (args) -> { + for (int i = 0; i < 100; i++) { + var msg = new Foo("ProduceConsumeBatchListener", i); + template.send(TOPIC, msg); + LOG.info("++++++PRODUCE {}------", msg); + } + }; + } + + @PulsarListener(topics = TOPIC, subscriptionName = TOPIC+"-sub", batch = true) + void consumeBatchMessagesFromPulsarTopic(List messages) { + messages.forEach((msg) -> LOG.info("++++++CONSUME {}------", msg)); + } + + } + + + @Configuration(proxyBeanMethods = false) + static class ProduceConsumeDefaultMappings { + + @Bean + ApplicationRunner sendBarWithoutTopicOrSchema(PulsarTemplate template) { + return (args) -> { + for (int i = 0; i < 10; i++) { + var msg = new Bar("ProduceConsumeDefaultMappings:" + i); + // Default topic and schema mappings are in application.yml + template.send(msg); + LOG.info("++++++PRODUCE {}------", msg); + } + }; + } + + @PulsarListener + void consumeBarWithoutTopicOrSchema(Bar msg) { + LOG.info("++++++CONSUME {}------", msg); + } + + } + + record Foo(String name, Integer value) { + } + + public record Bar(String value) { + } +} diff --git a/spring-pulsar-sample-apps/sample-app1/src/main/java/app1/package-info.java b/spring-pulsar-sample-apps/sample-imperative-produce-consume/src/main/java/com/example/package-info.java similarity index 89% rename from spring-pulsar-sample-apps/sample-app1/src/main/java/app1/package-info.java rename to spring-pulsar-sample-apps/sample-imperative-produce-consume/src/main/java/com/example/package-info.java index 4bf428b5..5f2a90d1 100644 --- a/spring-pulsar-sample-apps/sample-app1/src/main/java/app1/package-info.java +++ b/spring-pulsar-sample-apps/sample-imperative-produce-consume/src/main/java/com/example/package-info.java @@ -3,7 +3,7 @@ */ @NonNullApi @NonNullFields -package app1; +package com.example; import org.springframework.lang.NonNullApi; import org.springframework.lang.NonNullFields; diff --git a/spring-pulsar-sample-apps/sample-imperative-produce-consume/src/main/resources/application.yml b/spring-pulsar-sample-apps/sample-imperative-produce-consume/src/main/resources/application.yml new file mode 100644 index 00000000..1eee440f --- /dev/null +++ b/spring-pulsar-sample-apps/sample-imperative-produce-consume/src/main/resources/application.yml @@ -0,0 +1,8 @@ +spring: + pulsar: + defaults: + type-mappings: + - message-type: com.example.ImperativeProduceAndConsumeApp$Bar + topic-name: bar-topic + schema-info: + schema-type: JSON diff --git a/spring-pulsar-sample-apps/sample-imperative-produce-consume/src/test/java/com/example/ImperativeProduceAndConsumeAppTests.java b/spring-pulsar-sample-apps/sample-imperative-produce-consume/src/test/java/com/example/ImperativeProduceAndConsumeAppTests.java new file mode 100644 index 00000000..c23abfc8 --- /dev/null +++ b/spring-pulsar-sample-apps/sample-imperative-produce-consume/src/test/java/com/example/ImperativeProduceAndConsumeAppTests.java @@ -0,0 +1,88 @@ +/* + * Copyright 2012-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Function; +import java.util.stream.IntStream; + +import com.example.ImperativeProduceAndConsumeApp.Bar; +import com.example.ImperativeProduceAndConsumeApp.Foo; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.system.CapturedOutput; +import org.springframework.boot.test.system.OutputCaptureExtension; +import org.springframework.pulsar.test.support.PulsarTestContainerSupport; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; + +import static org.assertj.core.api.Assertions.assertThat; + +@SpringBootTest +@ExtendWith(OutputCaptureExtension.class) +class ImperativeProduceAndConsumeAppTests implements PulsarTestContainerSupport { + + @DynamicPropertySource + static void pulsarProperties(DynamicPropertyRegistry registry) { + registry.add("spring.pulsar.client.service-url", PULSAR_CONTAINER::getPulsarBrokerUrl); + registry.add("spring.pulsar.admin.service-url", PULSAR_CONTAINER::getHttpServiceUrl); + } + + @Test + void produceConsumeWithPrimitiveMessageType(CapturedOutput output) { + verifyProduceConsume(output,10, (i) -> "ProduceConsumeWithPrimitiveMessageType:" + i); + } + + @Test + void produceConsumeWithComplexMessageType(CapturedOutput output) { + verifyProduceConsume(output,10, + (i) -> new Foo("ProduceConsumeWithComplexMessageType", i)); + } + + @Test + void produceConsumeWithPartitions(CapturedOutput output) { + verifyProduceConsume(output,10, (i) -> "ProduceConsumeWithPartitions:" + i); + } + + @Test + void produceConsumeBatchListener(CapturedOutput output) { + verifyProduceConsume(output,100, + (i) -> new Foo("ProduceConsumeBatchListener", i)); + } + + @Test + void produceConsumeDefaultMappings(CapturedOutput output) { + verifyProduceConsume(output,10, (i) -> new Bar("ProduceConsumeDefaultMappings:" + i)); + } + + private void verifyProduceConsume(CapturedOutput output, int numExpectedMessages, + Function expectedMessageFactory) { + List < String > expectedOutput = new ArrayList<>(); + IntStream.range(0, numExpectedMessages).forEachOrdered((i) -> { + var msg = expectedMessageFactory.apply(i); + expectedOutput.add("++++++PRODUCE %s------".formatted(msg)); + expectedOutput.add("++++++CONSUME %s------".formatted(msg)); + }); + Awaitility.waitAtMost(Duration.ofSeconds(15)) + .untilAsserted(() -> assertThat(output).contains(expectedOutput)); + } +} diff --git a/spring-pulsar-sample-apps/sample-imperative-produce-consume/src/test/resources/logback-test.xml b/spring-pulsar-sample-apps/sample-imperative-produce-consume/src/test/resources/logback-test.xml new file mode 100644 index 00000000..97f7e370 --- /dev/null +++ b/spring-pulsar-sample-apps/sample-imperative-produce-consume/src/test/resources/logback-test.xml @@ -0,0 +1,14 @@ + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger - %msg%n + + + + + + + + + +