Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make ConcurrentKafkaListenerContainerFactoryConfigurer Generic #44638

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,31 +44,32 @@
* @author Eddú Meléndez
* @author Thomas Kåsene
* @author Moritz Halbritter
* @author Dimitrii Lipiridi
* @since 1.5.0
*/
public class ConcurrentKafkaListenerContainerFactoryConfigurer {
public class ConcurrentKafkaListenerContainerFactoryConfigurer<K, V> {

private KafkaProperties properties;

private BatchMessageConverter batchMessageConverter;

private RecordMessageConverter recordMessageConverter;

private RecordFilterStrategy<Object, Object> recordFilterStrategy;
private RecordFilterStrategy<? super K, ? super V> recordFilterStrategy;

private KafkaTemplate<Object, Object> replyTemplate;
private KafkaTemplate<?, ?> replyTemplate;

private KafkaAwareTransactionManager<Object, Object> transactionManager;
private KafkaAwareTransactionManager<?, ?> transactionManager;

private ConsumerAwareRebalanceListener rebalanceListener;

private CommonErrorHandler commonErrorHandler;

private AfterRollbackProcessor<Object, Object> afterRollbackProcessor;
private AfterRollbackProcessor<? super K, ? super V> afterRollbackProcessor;

private RecordInterceptor<Object, Object> recordInterceptor;
private RecordInterceptor<K, V> recordInterceptor;

private BatchInterceptor<Object, Object> batchInterceptor;
private BatchInterceptor<K, V> batchInterceptor;

private Function<MessageListenerContainer, String> threadNameSupplier;

Expand All @@ -78,47 +79,47 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
* Set the {@link KafkaProperties} to use.
* @param properties the properties
*/
void setKafkaProperties(KafkaProperties properties) {
public void setKafkaProperties(KafkaProperties properties) {
this.properties = properties;
}

/**
* Set the {@link BatchMessageConverter} to use.
* @param batchMessageConverter the message converter
*/
void setBatchMessageConverter(BatchMessageConverter batchMessageConverter) {
public void setBatchMessageConverter(BatchMessageConverter batchMessageConverter) {
this.batchMessageConverter = batchMessageConverter;
}

/**
* Set the {@link RecordMessageConverter} to use.
* @param recordMessageConverter the message converter
*/
void setRecordMessageConverter(RecordMessageConverter recordMessageConverter) {
public void setRecordMessageConverter(RecordMessageConverter recordMessageConverter) {
this.recordMessageConverter = recordMessageConverter;
}

/**
* Set the {@link RecordFilterStrategy} to use to filter incoming records.
* @param recordFilterStrategy the record filter strategy
*/
void setRecordFilterStrategy(RecordFilterStrategy<Object, Object> recordFilterStrategy) {
public void setRecordFilterStrategy(RecordFilterStrategy<? super K, ? super V> recordFilterStrategy) {
this.recordFilterStrategy = recordFilterStrategy;
}

/**
* Set the {@link KafkaTemplate} to use to send replies.
* @param replyTemplate the reply template
*/
void setReplyTemplate(KafkaTemplate<Object, Object> replyTemplate) {
public void setReplyTemplate(KafkaTemplate<?, ?> replyTemplate) {
this.replyTemplate = replyTemplate;
}

/**
* Set the {@link KafkaAwareTransactionManager} to use.
* @param transactionManager the transaction manager
*/
void setTransactionManager(KafkaAwareTransactionManager<Object, Object> transactionManager) {
public void setTransactionManager(KafkaAwareTransactionManager<?, ?> transactionManager) {
this.transactionManager = transactionManager;
}

Expand All @@ -127,7 +128,7 @@ void setTransactionManager(KafkaAwareTransactionManager<Object, Object> transact
* @param rebalanceListener the rebalance listener.
* @since 2.2
*/
void setRebalanceListener(ConsumerAwareRebalanceListener rebalanceListener) {
public void setRebalanceListener(ConsumerAwareRebalanceListener rebalanceListener) {
this.rebalanceListener = rebalanceListener;
}

Expand All @@ -144,39 +145,39 @@ public void setCommonErrorHandler(CommonErrorHandler commonErrorHandler) {
* Set the {@link AfterRollbackProcessor} to use.
* @param afterRollbackProcessor the after rollback processor
*/
void setAfterRollbackProcessor(AfterRollbackProcessor<Object, Object> afterRollbackProcessor) {
public void setAfterRollbackProcessor(AfterRollbackProcessor<? super K, ? super V> afterRollbackProcessor) {
this.afterRollbackProcessor = afterRollbackProcessor;
}

/**
* Set the {@link RecordInterceptor} to use.
* @param recordInterceptor the record interceptor.
*/
void setRecordInterceptor(RecordInterceptor<Object, Object> recordInterceptor) {
public void setRecordInterceptor(RecordInterceptor<K, V> recordInterceptor) {
this.recordInterceptor = recordInterceptor;
}

/**
* Set the {@link BatchInterceptor} to use.
* @param batchInterceptor the batch interceptor.
*/
void setBatchInterceptor(BatchInterceptor<Object, Object> batchInterceptor) {
public void setBatchInterceptor(BatchInterceptor<K, V> batchInterceptor) {
this.batchInterceptor = batchInterceptor;
}

/**
* Set the thread name supplier to use.
* @param threadNameSupplier the thread name supplier to use
*/
void setThreadNameSupplier(Function<MessageListenerContainer, String> threadNameSupplier) {
public void setThreadNameSupplier(Function<MessageListenerContainer, String> threadNameSupplier) {
this.threadNameSupplier = threadNameSupplier;
}

/**
* Set the executor for threads that poll the consumer.
* @param listenerTaskExecutor task executor
*/
void setListenerTaskExecutor(SimpleAsyncTaskExecutor listenerTaskExecutor) {
public void setListenerTaskExecutor(SimpleAsyncTaskExecutor listenerTaskExecutor) {
this.listenerTaskExecutor = listenerTaskExecutor;
}

Expand All @@ -187,14 +188,14 @@ void setListenerTaskExecutor(SimpleAsyncTaskExecutor listenerTaskExecutor) {
* to configure
* @param consumerFactory the {@link ConsumerFactory} to use
*/
public void configure(ConcurrentKafkaListenerContainerFactory<Object, Object> listenerFactory,
ConsumerFactory<Object, Object> consumerFactory) {
public void configure(ConcurrentKafkaListenerContainerFactory<K, V> listenerFactory,
ConsumerFactory<? super K, ? super V> consumerFactory) {
listenerFactory.setConsumerFactory(consumerFactory);
configureListenerFactory(listenerFactory);
configureContainer(listenerFactory.getContainerProperties());
}

private void configureListenerFactory(ConcurrentKafkaListenerContainerFactory<Object, Object> factory) {
private void configureListenerFactory(ConcurrentKafkaListenerContainerFactory<K, V> factory) {
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
Listener properties = this.properties.getListener();
map.from(properties::getConcurrency).to(factory::setConcurrency);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,23 +114,23 @@ class KafkaAnnotationDrivenConfiguration {
@Bean
@ConditionalOnMissingBean
@ConditionalOnThreading(Threading.PLATFORM)
ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurer() {
ConcurrentKafkaListenerContainerFactoryConfigurer<Object, Object> kafkaListenerContainerFactoryConfigurer() {
return configurer();
}

@Bean(name = "kafkaListenerContainerFactoryConfigurer")
@ConditionalOnMissingBean
@ConditionalOnThreading(Threading.VIRTUAL)
ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurerVirtualThreads() {
ConcurrentKafkaListenerContainerFactoryConfigurer configurer = configurer();
ConcurrentKafkaListenerContainerFactoryConfigurer<Object, Object> kafkaListenerContainerFactoryConfigurerVirtualThreads() {
ConcurrentKafkaListenerContainerFactoryConfigurer<Object, Object> configurer = configurer();
SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor("kafka-");
executor.setVirtualThreads(true);
configurer.setListenerTaskExecutor(executor);
return configurer;
}

private ConcurrentKafkaListenerContainerFactoryConfigurer configurer() {
ConcurrentKafkaListenerContainerFactoryConfigurer configurer = new ConcurrentKafkaListenerContainerFactoryConfigurer();
private ConcurrentKafkaListenerContainerFactoryConfigurer<Object, Object> configurer() {
ConcurrentKafkaListenerContainerFactoryConfigurer<Object, Object> configurer = new ConcurrentKafkaListenerContainerFactoryConfigurer<>();
configurer.setKafkaProperties(this.properties);
configurer.setBatchMessageConverter(this.batchMessageConverter);
configurer.setRecordMessageConverter(this.recordMessageConverter);
Expand All @@ -149,7 +149,7 @@ private ConcurrentKafkaListenerContainerFactoryConfigurer configurer() {
@Bean
@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConcurrentKafkaListenerContainerFactoryConfigurer<Object, Object> configurer,
ObjectProvider<ConsumerFactory<Object, Object>> kafkaConsumerFactory,
ObjectProvider<ContainerCustomizer<Object, Object, ConcurrentMessageListenerContainer<Object, Object>>> kafkaContainerCustomizer) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
*/
class ConcurrentKafkaListenerContainerFactoryConfigurerTests {

private ConcurrentKafkaListenerContainerFactoryConfigurer configurer;
private ConcurrentKafkaListenerContainerFactoryConfigurer<Object, Object> configurer;

private ConcurrentKafkaListenerContainerFactory<Object, Object> factory;

Expand All @@ -50,7 +50,7 @@ class ConcurrentKafkaListenerContainerFactoryConfigurerTests {
@BeforeEach
@SuppressWarnings("unchecked")
void setUp() {
this.configurer = new ConcurrentKafkaListenerContainerFactoryConfigurer();
this.configurer = new ConcurrentKafkaListenerContainerFactoryConfigurer<>();
this.properties = new KafkaProperties();
this.configurer.setKafkaProperties(this.properties);
this.factory = spy(new ConcurrentKafkaListenerContainerFactory<>());
Expand Down