Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions consumer/.gitattributes
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
/gradlew text eol=lf
*.bat text eol=crlf
*.jar binary
37 changes: 37 additions & 0 deletions consumer/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
HELP.md
.gradle
build/
!gradle/wrapper/gradle-wrapper.jar
!**/src/main/**/build/
!**/src/test/**/build/

### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
bin/
!**/src/main/**/bin/
!**/src/test/**/bin/

### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
out/
!**/src/main/**/out/
!**/src/test/**/out/

### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/

### VS Code ###
.vscode/
9 changes: 7 additions & 2 deletions build.gradle.kts → consumer/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
plugins {
java
id("org.springframework.boot") version "3.4.3"
id("org.springframework.boot") version "3.4.5"
id("io.spring.dependency-management") version "1.1.7"
}

group = "org"
group = "org.javaspringcourse"
version = "0.0.1-SNAPSHOT"

java {
Expand All @@ -19,7 +19,12 @@ repositories {

dependencies {
implementation("org.springframework.boot:spring-boot-starter")
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.springframework.kafka:spring-kafka")
compileOnly("org.projectlombok:lombok")
annotationProcessor("org.projectlombok:lombok")
testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("org.springframework.kafka:spring-kafka-test")
testRuntimeOnly("org.junit.platform:junit-platform-launcher")
}

Expand Down
Binary file added consumer/gradle/wrapper/gradle-wrapper.jar
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-8.12.1-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-8.13-bin.zip
networkTimeout=10000
validateDistributionUrl=true
zipStoreBase=GRADLE_USER_HOME
Expand Down
5 changes: 2 additions & 3 deletions gradlew → consumer/gradlew

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

File renamed without changes.
1 change: 1 addition & 0 deletions consumer/settings.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
rootProject.name = "consumer"
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package org.javaspringcourse;
package org.javaspringcourse.consumer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class JavaSpringCourseApplication {
public class ConsumerApplication {

public static void main(String[] args) {
SpringApplication.run(JavaSpringCourseApplication.class, args);
SpringApplication.run(ConsumerApplication.class, args);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package org.javaspringcourse.consumer.config;

import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.javaspringcourse.consumer.dto.MessageDto;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.CommonLoggingErrorHandler;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;

@Configuration
@EnableKafka
@RequiredArgsConstructor
public class KafkaConfig {
public static final String CONSUMER_LISTENER_CONTAINER_FACTORY = "consumerListenerContainerFactory";

private final KafkaProperties kafkaProperties;
private final ObjectMapper objectMapper;

@Bean(CONSUMER_LISTENER_CONTAINER_FACTORY)
public ConcurrentKafkaListenerContainerFactory<String, MessageDto> consumerListenerContainerFactory() {
return buildListenerContainerFactory(MessageDto.class);
}

<T> ConcurrentKafkaListenerContainerFactory<String, T> buildListenerContainerFactory(Class<T> clazz) {
var consumerFactory = new DefaultKafkaConsumerFactory<>(
kafkaProperties.buildConsumerProperties(),
new StringDeserializer(),
new ErrorHandlingDeserializer<>(
new JsonDeserializer<>(
clazz, objectMapper, false
)
)
);

var listenerContainerFactory = new ConcurrentKafkaListenerContainerFactory<String, T>();
listenerContainerFactory.setConsumerFactory(consumerFactory);
listenerContainerFactory.setCommonErrorHandler(commonLoggingErrorHandler());
return listenerContainerFactory;
}

@Bean
public CommonLoggingErrorHandler commonLoggingErrorHandler() { // только логирование ошибки
return new CommonLoggingErrorHandler();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package org.javaspringcourse.consumer.dto;

public record MessageDto(String message) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package org.javaspringcourse.consumer.listener;

import lombok.extern.log4j.Log4j2;
import org.javaspringcourse.consumer.config.KafkaConfig;
import org.javaspringcourse.consumer.dto.MessageDto;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

@Log4j2
@Component
public class MessageListener {

@KafkaListener(
containerFactory = KafkaConfig.CONSUMER_LISTENER_CONTAINER_FACTORY,
topics = "${javaspringcourse.kafka.message.topic}",
groupId = "${javaspringcourse.kafka.message.groupId}"
)
public void receiveMessage(@Payload MessageDto message) {
log.info("Received message: {}", message.message());
}
}
12 changes: 12 additions & 0 deletions consumer/src/main/resources/application.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
server:
port: 8082

spring:
kafka:
bootstrap-servers: [localhost:9092]

javaspringcourse:
kafka:
message:
topic: "message.incoming"
groupId: "message.incoming.id"
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package org.javaspringcourse;
package org.javaspringcourse.consumer;

import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class JavaSpringCourseApplicationTests {
class ConsumerApplicationTests {

@Test
void contextLoads() {
Expand Down
33 changes: 33 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
name: java-spring-course

services:
kafka:
image: apache/kafka:latest
environment:
KAFKA_NODE_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT_HOST://localhost:9092,PLAINTEXT://kafka:19092'
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:29093'
KAFKA_LISTENERS: 'CONTROLLER://:29093,PLAINTEXT_HOST://:9092,PLAINTEXT://:19092'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw'
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
ports:
- '9092:9092'
networks:
- common

kafka-ui:
image: provectuslabs/kafka-ui:latest
environment:
KAFKA_CLUSTERS_0_NAME: 'cluster-0'
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: 'kafka:19092'
ports:
- '8081:8080'
networks:
- common

networks:
common:
Binary file removed gradle/wrapper/gradle-wrapper.jar
Binary file not shown.
3 changes: 3 additions & 0 deletions producer/.gitattributes
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
/gradlew text eol=lf
*.bat text eol=crlf
*.jar binary
37 changes: 37 additions & 0 deletions producer/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
HELP.md
.gradle
build/
!gradle/wrapper/gradle-wrapper.jar
!**/src/main/**/build/
!**/src/test/**/build/

### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
bin/
!**/src/main/**/bin/
!**/src/test/**/bin/

### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
out/
!**/src/main/**/out/
!**/src/test/**/out/

### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/

### VS Code ###
.vscode/
33 changes: 33 additions & 0 deletions producer/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
plugins {
java
id("org.springframework.boot") version "3.4.5"
id("io.spring.dependency-management") version "1.1.7"
}

group = "org.javaspringcourse"
version = "0.0.1-SNAPSHOT"

java {
toolchain {
languageVersion = JavaLanguageVersion.of(23)
}
}

repositories {
mavenCentral()
}

dependencies {
implementation("org.springframework.boot:spring-boot-starter")
implementation("org.springframework.kafka:spring-kafka")
implementation("org.springframework.boot:spring-boot-starter-web")
compileOnly("org.projectlombok:lombok")
annotationProcessor("org.projectlombok:lombok")
testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("org.springframework.kafka:spring-kafka-test")
testRuntimeOnly("org.junit.platform:junit-platform-launcher")
}

tasks.withType<Test> {
useJUnitPlatform()
}
Binary file added producer/gradle/wrapper/gradle-wrapper.jar
Binary file not shown.
7 changes: 7 additions & 0 deletions producer/gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-8.13-bin.zip
networkTimeout=10000
validateDistributionUrl=true
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
Loading