-
-
Notifications
You must be signed in to change notification settings - Fork 57
/
Copy pathHelidonKafka.java
88 lines (79 loc) · 3.68 KB
/
HelidonKafka.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package helidon;
import io.helidon.common.reactive.Multi;
import io.helidon.messaging.Channel;
import io.helidon.messaging.Messaging;
import io.helidon.messaging.connectors.kafka.KafkaConfigBuilder;
import io.helidon.messaging.connectors.kafka.KafkaConnector;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.junit.Test;
public class HelidonKafka {
private final String kafkaServer = "localhost:9092";
private final String topic = "helidon_topic";
@Test
public void publisherConsumerKafka() throws InterruptedException {
kafkaPublisher();
kafkaConsumer();
Thread.sleep(10000);
}
/**
* A Kafka Consumer implementation. Consumers and Publisher are using a Channel for the communication.
* Once we have the channel we can add a [publisherConfig] which expect a Config,
* then using [KafkaConnector] DSL we can fill all the information need it for the communication.
*
* Once we have the channel we can use Messaging builder to specify using [connector] a ConnectorFactory
* in this particular case [KafkaConnector].
* After that we can just use listener as we explain in HelidonReactiveMessaging.
*/
private void kafkaConsumer() {
Channel<String> consumerChannel = Channel.<String>builder()
.name("kafka.png-connector")
.publisherConfig(KafkaConnector.configBuilder()
.bootstrapServers(kafkaServer)
.groupId("helidon-group")
.topic(topic)
.autoOffsetReset(KafkaConfigBuilder.AutoOffsetReset.LATEST)
.enableAutoCommit(true)
.keyDeserializer(StringDeserializer.class)
.valueDeserializer(StringDeserializer.class)
.build()
)
.build();
Messaging.builder()
.connector(KafkaConnector.create())
.listener(consumerChannel, payload -> {
System.out.println("Kafka says: " + payload);
})
.build()
.start();
}
/**
* A Kafka Publisher implementation. Consumers and Publisher are using a Channel for the communication.
* Once we have the channel we can add a [subscriberConfig] which expect a Config,
* then using [KafkaConnector] DSL we can fill all the information need it for the communication.
*
* Once we have the channel we can use Messaging builder to specify using [connector] a ConnectorFactory
* in this particular case [KafkaConnector].
*
* Then we specify using [publisher] a Publisher in this particular case [Multi] to send events.
* After that we can just use listener as we explain in HelidonReactiveMessaging.
*/
private void kafkaPublisher() {
Channel<String> publisherChannel = Channel.<String>builder()
.subscriberConfig(KafkaConnector.configBuilder()
.bootstrapServers(kafkaServer)
.topic(topic)
.keySerializer(StringSerializer.class)
.valueSerializer(StringSerializer.class)
.build()
).build();
Messaging.builder()
.connector(KafkaConnector.create())
.publisher(publisherChannel,
Multi.just("hello", "kafka", "world", "helidon", "connector")
.map(Message::of))
.build()
.start();
}
}