Skip to content

Commit

Permalink
Add an option to disable automatic kafka interceptor configuration in…
Browse files Browse the repository at this point in the history
… spring starter (#12833)
  • Loading branch information
laurit authored Dec 7, 2024
1 parent 81c7713 commit c2c5d80
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,43 +5,55 @@

package io.opentelemetry.instrumentation.spring.autoconfigure.internal.instrumentation.kafka;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.instrumentation.spring.kafka.v2_7.SpringKafkaTelemetry;
import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties;
import org.springframework.beans.factory.ObjectProvider;
import java.lang.reflect.Field;
import java.util.function.Supplier;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.kafka.config.AbstractKafkaListenerContainerFactory;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.listener.BatchInterceptor;
import org.springframework.kafka.listener.RecordInterceptor;

class ConcurrentKafkaListenerContainerFactoryPostProcessor implements BeanPostProcessor {

private final ObjectProvider<OpenTelemetry> openTelemetryProvider;
private final ObjectProvider<ConfigProperties> configPropertiesProvider;
private final Supplier<SpringKafkaTelemetry> springKafkaTelemetry;

ConcurrentKafkaListenerContainerFactoryPostProcessor(
ObjectProvider<OpenTelemetry> openTelemetryProvider,
ObjectProvider<ConfigProperties> configPropertiesProvider) {
this.openTelemetryProvider = openTelemetryProvider;
this.configPropertiesProvider = configPropertiesProvider;
Supplier<SpringKafkaTelemetry> springKafkaTelemetry) {
this.springKafkaTelemetry = springKafkaTelemetry;
}

@SuppressWarnings("unchecked")
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {
if (!(bean instanceof ConcurrentKafkaListenerContainerFactory)) {
return bean;
}

ConcurrentKafkaListenerContainerFactory<?, ?> listenerContainerFactory =
(ConcurrentKafkaListenerContainerFactory<?, ?>) bean;
SpringKafkaTelemetry springKafkaTelemetry =
SpringKafkaTelemetry.builder(openTelemetryProvider.getObject())
.setCaptureExperimentalSpanAttributes(
configPropertiesProvider
.getObject()
.getBoolean("otel.instrumentation.kafka.experimental-span-attributes", false))
.build();
listenerContainerFactory.setBatchInterceptor(springKafkaTelemetry.createBatchInterceptor());
listenerContainerFactory.setRecordInterceptor(springKafkaTelemetry.createRecordInterceptor());
ConcurrentKafkaListenerContainerFactory<Object, Object> listenerContainerFactory =
(ConcurrentKafkaListenerContainerFactory<Object, Object>) bean;
SpringKafkaTelemetry springKafkaTelemetry = this.springKafkaTelemetry.get();

// use reflection to read existing values to avoid overwriting user configured interceptors
BatchInterceptor<Object, Object> batchInterceptor =
readField(listenerContainerFactory, "batchInterceptor", BatchInterceptor.class);
RecordInterceptor<Object, Object> recordInterceptor =
readField(listenerContainerFactory, "recordInterceptor", RecordInterceptor.class);
listenerContainerFactory.setBatchInterceptor(
springKafkaTelemetry.createBatchInterceptor(batchInterceptor));
listenerContainerFactory.setRecordInterceptor(
springKafkaTelemetry.createRecordInterceptor(recordInterceptor));

return listenerContainerFactory;
}

private static <T> T readField(Object container, String filedName, Class<T> fieldType) {
try {
Field field = AbstractKafkaListenerContainerFactory.class.getDeclaredField(filedName);
field.setAccessible(true);
return fieldType.cast(field.get(container));
} catch (Exception exception) {
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.instrumentation.kafkaclients.v2_6.KafkaTelemetry;
import io.opentelemetry.instrumentation.spring.autoconfigure.internal.ConditionalOnEnabledInstrumentation;
import io.opentelemetry.instrumentation.spring.kafka.v2_7.SpringKafkaTelemetry;
import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaProducerFactoryCustomizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
Expand All @@ -33,13 +35,29 @@ DefaultKafkaProducerFactoryCustomizer otelKafkaProducerFactoryCustomizer(
return producerFactory -> producerFactory.addPostProcessor(kafkaTelemetry::wrap);
}

@Bean
static SpringKafkaTelemetry getTelemetry(
ObjectProvider<OpenTelemetry> openTelemetryProvider,
ObjectProvider<ConfigProperties> configPropertiesProvider) {
return SpringKafkaTelemetry.builder(openTelemetryProvider.getObject())
.setCaptureExperimentalSpanAttributes(
configPropertiesProvider
.getObject()
.getBoolean("otel.instrumentation.kafka.experimental-span-attributes", false))
.build();
}

// static to avoid "is not eligible for getting processed by all BeanPostProcessors" warning
@Bean
@ConditionalOnProperty(
name = "otel.instrumentation.kafka.autoconfigure-interceptor",
havingValue = "true",
matchIfMissing = true)
static ConcurrentKafkaListenerContainerFactoryPostProcessor
otelKafkaListenerContainerFactoryBeanPostProcessor(
ObjectProvider<OpenTelemetry> openTelemetryProvider,
ObjectProvider<ConfigProperties> configPropertiesProvider) {
return new ConcurrentKafkaListenerContainerFactoryPostProcessor(
openTelemetryProvider, configPropertiesProvider);
() -> getTelemetry(openTelemetryProvider, configPropertiesProvider));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,12 @@
"description": "Enable the capture of experimental Kafka span attributes.",
"defaultValue": false
},
{
"name": "otel.instrumentation.kafka.autoconfigure-interceptor",
"type": "java.lang.Boolean",
"description": "Enable automatic configuration of tracing interceptors on <code>ConcurrentKafkaListenerContainerFactory</code> using a <code>BeanPostProcessor</code>. You may disable this if you wish to manually configure these interceptors.",
"defaultValue": true
},
{
"name": "otel.instrumentation.mongo.enabled",
"type": "java.lang.Boolean",
Expand Down

0 comments on commit c2c5d80

Please sign in to comment.