Skip to content

Commit 89b19f1

Browse files
GH-235: Fix Retrieval & Lifecycle Config for KCL
Fixes: #235 * Corrected the initialization of Retrieval and Lifecycle Config. The start Scheduler was creating different instances. * GH-235 Fixing the failing tests for Enhanced Fan Out.
1 parent 429ae04 commit 89b19f1

File tree

2 files changed

+27
-25
lines changed

2 files changed

+27
-25
lines changed

src/main/java/org/springframework/integration/aws/inbound/kinesis/KclMessageDrivenChannelAdapter.java

Lines changed: 25 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import software.amazon.kinesis.exceptions.InvalidStateException;
4545
import software.amazon.kinesis.exceptions.ShutdownException;
4646
import software.amazon.kinesis.exceptions.ThrottlingException;
47+
import software.amazon.kinesis.lifecycle.LifecycleConfig;
4748
import software.amazon.kinesis.lifecycle.events.InitializationInput;
4849
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
4950
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
@@ -57,6 +58,7 @@
5758
import software.amazon.kinesis.processor.SingleStreamTracker;
5859
import software.amazon.kinesis.processor.StreamTracker;
5960
import software.amazon.kinesis.retrieval.KinesisClientRecord;
61+
import software.amazon.kinesis.retrieval.RetrievalConfig;
6062
import software.amazon.kinesis.retrieval.RetrievalSpecificConfig;
6163
import software.amazon.kinesis.retrieval.fanout.FanOutConfig;
6264
import software.amazon.kinesis.retrieval.polling.PollingConfig;
@@ -276,28 +278,6 @@ protected void onInit() {
276278
this.cloudWatchClient,
277279
this.workerId,
278280
this.recordProcessorFactory);
279-
280-
this.config.lifecycleConfig().taskBackoffTimeMillis(this.consumerBackoff);
281-
282-
RetrievalSpecificConfig retrievalSpecificConfig;
283-
284-
String singleStreamName = this.streams.length == 1 ? this.streams[0] : null;
285-
286-
if (this.fanOut) {
287-
retrievalSpecificConfig =
288-
new FanOutConfig(this.kinesisClient)
289-
.applicationName(this.consumerGroup)
290-
.streamName(singleStreamName);
291-
}
292-
else {
293-
retrievalSpecificConfig =
294-
new PollingConfig(this.kinesisClient)
295-
.streamName(singleStreamName);
296-
}
297-
298-
this.config.retrievalConfig()
299-
.glueSchemaRegistryDeserializer(this.glueSchemaRegistryDeserializer)
300-
.retrievalSpecificConfig(retrievalSpecificConfig);
301281
}
302282

303283
private StreamTracker buildStreamTracker() {
@@ -320,15 +300,36 @@ protected void doStart() {
320300
+ "because it does not make sense in case of [ListenerMode.batch].");
321301
}
322302

303+
LifecycleConfig lifecycleConfig = this.config.lifecycleConfig();
304+
lifecycleConfig.taskBackoffTimeMillis(this.consumerBackoff);
305+
306+
RetrievalSpecificConfig retrievalSpecificConfig;
307+
String singleStreamName = this.streams.length == 1 ? this.streams[0] : null;
308+
if (this.fanOut) {
309+
retrievalSpecificConfig =
310+
new FanOutConfig(this.kinesisClient)
311+
.applicationName(this.consumerGroup)
312+
.streamName(singleStreamName);
313+
}
314+
else {
315+
retrievalSpecificConfig =
316+
new PollingConfig(this.kinesisClient)
317+
.streamName(singleStreamName);
318+
}
319+
320+
RetrievalConfig retrievalConfig = this.config.retrievalConfig()
321+
.glueSchemaRegistryDeserializer(this.glueSchemaRegistryDeserializer)
322+
.retrievalSpecificConfig(retrievalSpecificConfig);
323+
323324
this.scheduler =
324325
new Scheduler(
325326
this.config.checkpointConfig(),
326327
this.config.coordinatorConfig(),
327328
this.config.leaseManagementConfig(),
328-
this.config.lifecycleConfig(),
329+
lifecycleConfig,
329330
this.config.metricsConfig(),
330331
this.config.processorConfig(),
331-
this.config.retrievalConfig());
332+
retrievalConfig);
332333

333334
this.executor.execute(this.scheduler);
334335
}

src/test/java/org/springframework/integration/aws/kinesis/KclMessageDrivenChannelAdapterTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,8 @@ void kclChannelAdapterReceivesRecords() {
112112
.join()
113113
.consumers();
114114

115-
assertThat(streamConsumers).hasSize(1);
115+
// Because FanOut is false, there would be no Stream Consumers.
116+
assertThat(streamConsumers).hasSize(0);
116117
}
117118

118119
@Configuration

0 commit comments

Comments
 (0)