From 2d7eeab2763587ad0822db6a1ba077283f32ef0b Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Wed, 29 Apr 2020 11:34:56 -0400 Subject: [PATCH] Merge pull request #1470 from garyrussell/ARBPDecorate Decorate exceptions for AfterRollbackProcessor --- .../listener/KafkaMessageListenerContainer.java | 12 +++++++----- .../kafka/listener/TransactionalContainerTests.java | 13 +++++++++---- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 2581c5284..652e2976c 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -1376,12 +1376,14 @@ private void batchAfterRollback(final ConsumerRecords records, final List> recordList, RuntimeException e, AfterRollbackProcessor afterRollbackProcessorToUse) { + RuntimeException rollbackException = decorateException(e); try { if (recordList == null) { - afterRollbackProcessorToUse.process(createRecordList(records), this.consumer, e, false); + afterRollbackProcessorToUse.process(createRecordList(records), this.consumer, rollbackException, + false); } else { - afterRollbackProcessorToUse.process(recordList, this.consumer, e, false); + afterRollbackProcessorToUse.process(recordList, this.consumer, rollbackException, false); } } catch (KafkaException ke) { @@ -1607,7 +1609,7 @@ public void doInTransactionWithoutResult(TransactionStatus s) { } catch (RuntimeException e) { this.logger.error(e, "Transaction rolled back"); - recordAfterRollback(iterator, record, e); + recordAfterRollback(iterator, record, decorateException(e)); } finally { if (this.producerPerConsumerPartition) { @@ -1838,8 +1840,8 @@ private void invokeErrorHandler(final ConsumerRecord record, } } - private Exception decorateException(RuntimeException e) { - Exception toHandle = e; + private RuntimeException decorateException(RuntimeException e) { + RuntimeException toHandle = e; if (toHandle instanceof ListenerExecutionFailedException) { toHandle = new ListenerExecutionFailedException(toHandle.getMessage(), this.consumerGroupId, toHandle.getCause()); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java index a2ada1537..f071bdabc 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java @@ -575,7 +575,7 @@ public void onPartitionsAssigned(Collection partitions) { public void testMaxFailures() throws Exception { logger.info("Start testMaxFailures"); Map props = KafkaTestUtils.consumerProps("txTestMaxFailures", "false", embeddedKafka); - props.put(ConsumerConfig.GROUP_ID_CONFIG, "group"); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "groupInARBP"); props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); ContainerProperties containerProps = new ContainerProperties(topic3); @@ -650,9 +650,10 @@ public void accept(ConsumerRecord record, Exception exception) { Map map = new HashMap<>(); mapper.toHeaders(dltRecord.headers(), map); MessageHeaders headers = new MessageHeaders(map); - assertThat(new String(headers.get(KafkaHeaders.DLT_EXCEPTION_FQCN, byte[].class))).contains("RuntimeException"); + assertThat(new String(headers.get(KafkaHeaders.DLT_EXCEPTION_FQCN, byte[].class))) + .contains("ListenerExecutionFailedException"); assertThat(headers.get(KafkaHeaders.DLT_EXCEPTION_MESSAGE, byte[].class)) - .isEqualTo("fail for max failures".getBytes()); + .contains("fail for max failures".getBytes()); assertThat(headers.get(KafkaHeaders.DLT_EXCEPTION_STACKTRACE)).isNotNull(); assertThat(headers.get(KafkaHeaders.DLT_ORIGINAL_OFFSET, byte[].class)[3]).isEqualTo((byte) 0); assertThat(headers.get(KafkaHeaders.DLT_ORIGINAL_PARTITION, byte[].class)[3]).isEqualTo((byte) 0); @@ -663,7 +664,11 @@ public void accept(ConsumerRecord record, Exception exception) { pf.destroy(); assertThat(stopLatch.await(10, TimeUnit.SECONDS)).isTrue(); verify(afterRollbackProcessor, times(4)).isProcessInTransaction(); - verify(afterRollbackProcessor, times(4)).process(any(), any(), any(), anyBoolean()); + ArgumentCaptor captor = ArgumentCaptor.forClass(Exception.class); + verify(afterRollbackProcessor, times(4)).process(any(), any(), captor.capture(), anyBoolean()); + assertThat(captor.getValue()).isInstanceOf(ListenerExecutionFailedException.class) + .extracting(ex -> ((ListenerExecutionFailedException) ex).getGroupId()) + .isEqualTo("groupInARBP"); verify(afterRollbackProcessor).clearThreadState(); verify(dlTemplate).send(any(ProducerRecord.class)); verify(dlTemplate).sendOffsetsToTransaction(