Skip to content

Commit 3e32cea

Browse files
author
Dimitrii Lipiridi
committed
Make ConcurrentKafkaListenerContainerFactoryConfigurer generic to support configuring any container factory
Signed-off-by: Dimitrii Lipiridi <dimitrii.lipiridi@delasport.com>
1 parent 6f464e7 commit 3e32cea

File tree

3 files changed

+25
-24
lines changed

3 files changed

+25
-24
lines changed

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java

+17-16
Original file line numberDiff line numberDiff line change
@@ -44,31 +44,32 @@
4444
* @author Eddú Meléndez
4545
* @author Thomas Kåsene
4646
* @author Moritz Halbritter
47+
* @author Dimitrii Lipiridi
4748
* @since 1.5.0
4849
*/
49-
public class ConcurrentKafkaListenerContainerFactoryConfigurer {
50+
public class ConcurrentKafkaListenerContainerFactoryConfigurer<K, V> {
5051

5152
private KafkaProperties properties;
5253

5354
private BatchMessageConverter batchMessageConverter;
5455

5556
private RecordMessageConverter recordMessageConverter;
5657

57-
private RecordFilterStrategy<Object, Object> recordFilterStrategy;
58+
private RecordFilterStrategy<? super K, ? super V> recordFilterStrategy;
5859

59-
private KafkaTemplate<Object, Object> replyTemplate;
60+
private KafkaTemplate<?, ?> replyTemplate;
6061

61-
private KafkaAwareTransactionManager<Object, Object> transactionManager;
62+
private KafkaAwareTransactionManager<?, ?> transactionManager;
6263

6364
private ConsumerAwareRebalanceListener rebalanceListener;
6465

6566
private CommonErrorHandler commonErrorHandler;
6667

67-
private AfterRollbackProcessor<Object, Object> afterRollbackProcessor;
68+
private AfterRollbackProcessor<? super K, ? super V> afterRollbackProcessor;
6869

69-
private RecordInterceptor<Object, Object> recordInterceptor;
70+
private RecordInterceptor<K, V> recordInterceptor;
7071

71-
private BatchInterceptor<Object, Object> batchInterceptor;
72+
private BatchInterceptor<K, V> batchInterceptor;
7273

7374
private Function<MessageListenerContainer, String> threadNameSupplier;
7475

@@ -102,23 +103,23 @@ void setRecordMessageConverter(RecordMessageConverter recordMessageConverter) {
102103
* Set the {@link RecordFilterStrategy} to use to filter incoming records.
103104
* @param recordFilterStrategy the record filter strategy
104105
*/
105-
void setRecordFilterStrategy(RecordFilterStrategy<Object, Object> recordFilterStrategy) {
106+
void setRecordFilterStrategy(RecordFilterStrategy<? super K, ? super V> recordFilterStrategy) {
106107
this.recordFilterStrategy = recordFilterStrategy;
107108
}
108109

109110
/**
110111
* Set the {@link KafkaTemplate} to use to send replies.
111112
* @param replyTemplate the reply template
112113
*/
113-
void setReplyTemplate(KafkaTemplate<Object, Object> replyTemplate) {
114+
void setReplyTemplate(KafkaTemplate<?, ?> replyTemplate) {
114115
this.replyTemplate = replyTemplate;
115116
}
116117

117118
/**
118119
* Set the {@link KafkaAwareTransactionManager} to use.
119120
* @param transactionManager the transaction manager
120121
*/
121-
void setTransactionManager(KafkaAwareTransactionManager<Object, Object> transactionManager) {
122+
void setTransactionManager(KafkaAwareTransactionManager<?, ?> transactionManager) {
122123
this.transactionManager = transactionManager;
123124
}
124125

@@ -144,23 +145,23 @@ public void setCommonErrorHandler(CommonErrorHandler commonErrorHandler) {
144145
* Set the {@link AfterRollbackProcessor} to use.
145146
* @param afterRollbackProcessor the after rollback processor
146147
*/
147-
void setAfterRollbackProcessor(AfterRollbackProcessor<Object, Object> afterRollbackProcessor) {
148+
void setAfterRollbackProcessor(AfterRollbackProcessor<? super K, ? super V> afterRollbackProcessor) {
148149
this.afterRollbackProcessor = afterRollbackProcessor;
149150
}
150151

151152
/**
152153
* Set the {@link RecordInterceptor} to use.
153154
* @param recordInterceptor the record interceptor.
154155
*/
155-
void setRecordInterceptor(RecordInterceptor<Object, Object> recordInterceptor) {
156+
void setRecordInterceptor(RecordInterceptor<K, V> recordInterceptor) {
156157
this.recordInterceptor = recordInterceptor;
157158
}
158159

159160
/**
160161
* Set the {@link BatchInterceptor} to use.
161162
* @param batchInterceptor the batch interceptor.
162163
*/
163-
void setBatchInterceptor(BatchInterceptor<Object, Object> batchInterceptor) {
164+
void setBatchInterceptor(BatchInterceptor<K, V> batchInterceptor) {
164165
this.batchInterceptor = batchInterceptor;
165166
}
166167

@@ -187,14 +188,14 @@ void setListenerTaskExecutor(SimpleAsyncTaskExecutor listenerTaskExecutor) {
187188
* to configure
188189
* @param consumerFactory the {@link ConsumerFactory} to use
189190
*/
190-
public void configure(ConcurrentKafkaListenerContainerFactory<Object, Object> listenerFactory,
191-
ConsumerFactory<Object, Object> consumerFactory) {
191+
public void configure(ConcurrentKafkaListenerContainerFactory<K, V> listenerFactory,
192+
ConsumerFactory<? super K, ? super V> consumerFactory) {
192193
listenerFactory.setConsumerFactory(consumerFactory);
193194
configureListenerFactory(listenerFactory);
194195
configureContainer(listenerFactory.getContainerProperties());
195196
}
196197

197-
private void configureListenerFactory(ConcurrentKafkaListenerContainerFactory<Object, Object> factory) {
198+
private void configureListenerFactory(ConcurrentKafkaListenerContainerFactory<K, V> factory) {
198199
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
199200
Listener properties = this.properties.getListener();
200201
map.from(properties::getConcurrency).to(factory::setConcurrency);

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -114,23 +114,23 @@ class KafkaAnnotationDrivenConfiguration {
114114
@Bean
115115
@ConditionalOnMissingBean
116116
@ConditionalOnThreading(Threading.PLATFORM)
117-
ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurer() {
117+
ConcurrentKafkaListenerContainerFactoryConfigurer<Object, Object> kafkaListenerContainerFactoryConfigurer() {
118118
return configurer();
119119
}
120120

121121
@Bean(name = "kafkaListenerContainerFactoryConfigurer")
122122
@ConditionalOnMissingBean
123123
@ConditionalOnThreading(Threading.VIRTUAL)
124-
ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurerVirtualThreads() {
125-
ConcurrentKafkaListenerContainerFactoryConfigurer configurer = configurer();
124+
ConcurrentKafkaListenerContainerFactoryConfigurer<Object, Object> kafkaListenerContainerFactoryConfigurerVirtualThreads() {
125+
ConcurrentKafkaListenerContainerFactoryConfigurer<Object, Object> configurer = configurer();
126126
SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor("kafka-");
127127
executor.setVirtualThreads(true);
128128
configurer.setListenerTaskExecutor(executor);
129129
return configurer;
130130
}
131131

132-
private ConcurrentKafkaListenerContainerFactoryConfigurer configurer() {
133-
ConcurrentKafkaListenerContainerFactoryConfigurer configurer = new ConcurrentKafkaListenerContainerFactoryConfigurer();
132+
private ConcurrentKafkaListenerContainerFactoryConfigurer<Object, Object> configurer() {
133+
ConcurrentKafkaListenerContainerFactoryConfigurer<Object, Object> configurer = new ConcurrentKafkaListenerContainerFactoryConfigurer<>();
134134
configurer.setKafkaProperties(this.properties);
135135
configurer.setBatchMessageConverter(this.batchMessageConverter);
136136
configurer.setRecordMessageConverter(this.recordMessageConverter);
@@ -149,7 +149,7 @@ private ConcurrentKafkaListenerContainerFactoryConfigurer configurer() {
149149
@Bean
150150
@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
151151
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
152-
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
152+
ConcurrentKafkaListenerContainerFactoryConfigurer<Object, Object> configurer,
153153
ObjectProvider<ConsumerFactory<Object, Object>> kafkaConsumerFactory,
154154
ObjectProvider<ContainerCustomizer<Object, Object, ConcurrentMessageListenerContainer<Object, Object>>> kafkaContainerCustomizer) {
155155
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurerTests.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
*/
4040
class ConcurrentKafkaListenerContainerFactoryConfigurerTests {
4141

42-
private ConcurrentKafkaListenerContainerFactoryConfigurer configurer;
42+
private ConcurrentKafkaListenerContainerFactoryConfigurer<Object, Object> configurer;
4343

4444
private ConcurrentKafkaListenerContainerFactory<Object, Object> factory;
4545

@@ -50,7 +50,7 @@ class ConcurrentKafkaListenerContainerFactoryConfigurerTests {
5050
@BeforeEach
5151
@SuppressWarnings("unchecked")
5252
void setUp() {
53-
this.configurer = new ConcurrentKafkaListenerContainerFactoryConfigurer();
53+
this.configurer = new ConcurrentKafkaListenerContainerFactoryConfigurer<>();
5454
this.properties = new KafkaProperties();
5555
this.configurer.setKafkaProperties(this.properties);
5656
this.factory = spy(new ConcurrentKafkaListenerContainerFactory<>());

0 commit comments

Comments
 (0)