Skip to content

Commit

Permalink
Merge pull request #2409 from ozangunalp/kafka_request_reply
Browse files Browse the repository at this point in the history
Kafka Request Reply emitter and manual partition assignment
  • Loading branch information
ozangunalp authored Dec 13, 2023
2 parents f277c46 + 1491bde commit c79b8bb
Show file tree
Hide file tree
Showing 36 changed files with 2,294 additions and 70 deletions.
1 change: 1 addition & 0 deletions documentation/mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ nav:
- 'Customizing Default Kafka Configuration': kafka/default-configuration.md
- 'Test Companion for Kafka': kafka/test-companion.md
- 'Kafka Transactions and Exactly-Once Processing': kafka/transactions.md
- 'Kafka Request/Reply': kafka/request-reply.md

- AMQP 1.0:
- amqp/amqp.md
Expand Down
32 changes: 32 additions & 0 deletions documentation/src/main/docs/kafka/receiving-kafka-records.md
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,38 @@ Conversely, if the processing throws an exception, all messages are
*nacked*, applying the failure strategy for all the records inside the
batch.

## Manual topic-partition assignment

The default behavior of Kafka incoming channels is to subscribe to one or more topics in order to receive records from the Kafka broker.
Channel attributes `topic` and `topics` allow specifying topics to subscribe to,
or `pattern` attribute allows to subscribe to all topics matching a regular expression.
Subscribing to topics allows partitioning consumption of topics by dynamically assigning (rebalancing) partitions between members of a consumer group.

The `assign-seek` configuration attribute allows manually assigning topic-partitions to a Kafka incoming channel,
and optionally seek to a specified offset in the partition to start consuming records.
If `assign-seek` is used, the consumer will not be dynamically subscribed to topics,
but instead will statically assign the described partitions.
In manual topic-partition rebalancing doesn't happen and therefore rebalance listeners are never called.

The attribute takes a list of triplets separated by commas: `<topic>:<partition>:<offset>`.

For example, the following configuration

```properties
mp.messaging.incoming.data.assign-seek=topic1:0:10, topic2:1:20
```

assigns the consumer to:
- Partition 0 of topic 'topic1', setting the initial position at offset 10.
- Partition 1 of topic 'topic2', setting the initial position at offset 20.

The topic, partition, and offset in each triplet can have the following variations:
- If the topic is omitted, the configured `topic` will be used.
- If the offset is omitted, partitions are assigned to the consumer but won't be seeked to offset.
- If offset is 0, it seeks to the beginning of the topic-partition.
- If offset is -1, it seeks to the end of the topic-partition.


## Stateful processing with Checkpointing

!!!warning "Experimental"
Expand Down
131 changes: 131 additions & 0 deletions documentation/src/main/docs/kafka/request-reply.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
# Kafka Request/Reply

!!!warning "Experimental"
Kafka Request Reply Emitter is an experimental feature.

The Kafka [Request-Reply](https://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReply.html) pattern allows you to publish a message to a Kafka topic and then await for a reply message that responds to the initial request.

The `KafkaRequestReply` emitter implements the requestor (or the client) of the request-reply pattern for Kafka outbound channels:

``` java
{{ insert('kafka/outbound/KafkaRequestReplyEmitter.java') }}
```

The `request` method publishes the request record to the configured target topic of the outgoing channel,
and polls a reply topic (by default, the target topic with `-replies` suffix) for a reply record.
When the reply is received the returned `Uni` is completed with the record value.

The request send operation generates a correlation id and sets a header (by default `REPLY_CORRELATION_ID`),
which it expects to be sent back in the reply record. Two additional headers are set on the request record:

- The topic from which the reply is expected, by default `REPLY_TOPIC` header,
which can be configured using the `reply.topic` channel attribute.
- Optionally, the partition from which the reply is expected, by default `REPLY_PARTITION` header.
The reply partition header is added only when the Kafka request reply is configured specifically to receive records from a topic-partition
, using the `reply.partition` channel attribute.
The reply partition header integer value is encoded in 4 bytes,
and helper methods `KafkaRequestReply#replyPartitionFromBytes` and `KafkaRequestReply#replyPartitionToBytes` can be used for custom operations.

The replier (or the server) can be implemented using a Reactive Messaging processor:

``` java
{{ insert('kafka/outbound/KafkaReplier.java') }}
```

Kafka outgoing channels detect default `REPLY_CORRELATION_ID`, `REPLY_TOPIC` and `REPLY_PARTITION` headers
and send the reply record to the expected topic-partition by propagating back the correlation id header.

Default headers can be configured, using `reply.correlation-id.header`, `reply.topic.header` and `reply.partition.header` channel attributes.
If custom headers are used the reply server needs some more manual work.
Given the following request/reply outgoing configuration:

```properties
mp.messaging.outgoing.reqrep.topic=requests
mp.messaging.outgoing.reqrep.reply.correlation-id.header=MY_CORRELATION
mp.messaging.outgoing.reqrep.reply.topic.header=MY_TOPIC
mp.messaging.outgoing.reqrep.reply.partition.header=MY_PARTITION
mp.messaging.incoming.request.topic=requests
```

The reply server can be implemented as the following:

``` java
{{ insert('kafka/outbound/KafkaCustomHeaderReplier.java') }}
```

## Requesting with `Message` types

Like the core Emitter's `send` methods, `request` method also can receive a `Message` type and return a message:

``` java
{{ insert('kafka/outbound/KafkaRequestReplyMessageEmitter.java') }}
```

!!! note
The ingested reply type of the `KafkaRequestReply` is discovered at runtime,
in order to configure a `MessageConveter` to be applied on the incoming message before returning the `Uni` result.

## Scaling Request/Reply

If multiple requestor instances are configured on the same outgoing topic, and the same reply topic,
each requestor consumer will generate a unique consumer group.id and
therefore all requestor instances will receive replies of all instances. If an observed correlation id doesn't match
the id of any pending replies, the reply is simply discarded.
With the additional network traffic this allows scaling requestors, (and repliers) dynamically.

Alternatively, requestor instances can be configured to consume replies from dedicated topics using `reply.topic` attribute,
or distinct partitions of a single topic, using `reply.partition` attribute.
The later will configure the Kafka consumer to assign statically to the given partition.

## Pending replies and reply timeout

By default, the `Uni` returned from the `request` method is configured to fail with timeout exception if no replies is received after 5 seconds.
This timeout is configurable with the channel attribute `reply.timeout`.

A snapshot of the list of pending replies is available through the `KafkaRequestReply#getPendingReplies` method.

## Waiting for topic-partition assignment

The requestor can be found in a position where a request is sent, and it's reply is already published to the reply topic,
before the requestor starts and polls the consumer.
In case the reply consumer is configured with `auto.offset.reset=latest`, which is the default value, this can lead to the requestor missing replies.
If `auto.offset.reset` is `latest`, at wiring time, before any request can take place, the `KafkaRequestReply`
finds partitions that the consumer needs to subscribe and waits for their assignment to the consumer.
On other occasons the `KafkaRequestReply#waitForAssignments` method can be used.

## Correlation Ids

The Kafka Request/Reply allows configuring the correlation id mechanism completely through a `CorrelationIdHandler` implementation.
The default handler is based on randomly generated UUID strings, written to byte array in Kafka record headers.
The correlation id handler implementation can be configured using the `reply.correlation-id.handler` attribute.
As mentioned the default configuration is `uuid`,
and an alternative `bytes` implementation can be used to generate 12 bytes random correlation ids.

Custom handlers can be implemented by proposing a CDI-managed bean with `@Identifier` qualifier.

## Reply Error Handling

If the reply server produces an error and can or would like to propagate the error back to the requestor, failing the returned `Uni`.

If configured using the `reply.failure.handler` channel attribute,
the `ReplyFailureHandler` implementations are discovered through CDI, matching the `@Identifier` qualifier.

A sample reply error handler can lookup header values and return the error to be thrown by the reply:

``` java
{{ insert('kafka/outbound/MyReplyFailureHandler.java') }}
```

`null` return value indicates that no error has been found in the reply record, and it can be delivered to the application.

## Advanced configuration for the Kafka consumer

The underlying Kafka consumer can be configured with the `reply` property prefix.
For example the underlying Kafka consuemer can be configured to batch mode using:

```properties
mp.messaging.outgoing.reqrep.topic=requests
mp.messaging.outgoing.reqrep.reply.topic=quote-results
mp.messaging.outgoing.reqrep.reply.batch=true
mp.messaging.outgoing.reqrep.reply.commit-strategy=latest
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package kafka.outbound;

import java.util.Random;

import jakarta.enterprise.context.ApplicationScoped;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.reactive.messaging.kafka.reply.KafkaRequestReply;

@ApplicationScoped
public class KafkaCustomHeaderReplier {

Random rand = new Random();

@Incoming("request")
@Outgoing("reply")
ProducerRecord<String, Integer> process(ConsumerRecord<String, String> request) {
Header topicHeader = request.headers().lastHeader("MY_TOPIC");
if (topicHeader == null) {
// Skip
return null;
}
String myTopic = new String(topicHeader.value());
int generateValue = rand.nextInt(100);
Header partitionHeader = request.headers().lastHeader("MY_PARTITION");
if (partitionHeader == null) {
// Propagate incoming headers, including the correlation id header
return new ProducerRecord<>(myTopic, null, request.key(), generateValue, request.headers());
}
// Send the replies to extracted myTopic-myPartition
int myPartition = KafkaRequestReply.replyPartitionFromBytes(partitionHeader.value());
return new ProducerRecord<>(myTopic, myPartition, request.key(), generateValue, request.headers());
}
}
20 changes: 20 additions & 0 deletions documentation/src/main/java/kafka/outbound/KafkaReplier.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package kafka.outbound;

import java.util.Random;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

@ApplicationScoped
public class KafkaReplier {

Random rand = new Random();

@Incoming("request")
@Outgoing("reply")
int handleRequest(String request) {
return rand.nextInt(100);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package kafka.outbound;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import org.eclipse.microprofile.reactive.messaging.Channel;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.reply.KafkaRequestReply;

@ApplicationScoped
public class KafkaRequestReplyEmitter {

@Inject
@Channel("my-request")
KafkaRequestReply<String, Integer> quoteRequest;

public Uni<Integer> requestQuote(String request) {
return quoteRequest.request(request);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package kafka.outbound;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Message;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata;
import io.smallrye.reactive.messaging.kafka.reply.KafkaRequestReply;

@ApplicationScoped
public class KafkaRequestReplyMessageEmitter {

@Inject
@Channel("my-request")
KafkaRequestReply<String, Integer> quoteRequest;

public Uni<Message<Integer>> requestMessage(String request) {
return quoteRequest.request(Message.of(request)
.addMetadata(OutgoingKafkaRecordMetadata.builder()
.withKey(request)
.build()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package kafka.outbound;

import jakarta.enterprise.context.ApplicationScoped;

import org.apache.kafka.common.header.Header;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.reactive.messaging.kafka.KafkaRecord;
import io.smallrye.reactive.messaging.kafka.reply.ReplyFailureHandler;

@ApplicationScoped
@Identifier("my-reply-error")
public class MyReplyFailureHandler implements ReplyFailureHandler {

@Override
public Throwable handleReply(KafkaRecord<?, ?> replyRecord) {
Header header = replyRecord.getHeaders().lastHeader("REPLY_ERROR");
if (header != null) {
return new IllegalArgumentException(new String(header.value()));
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.kafka.common.header.Header;
Expand Down Expand Up @@ -33,6 +34,15 @@ public static <K> OutgoingKafkaRecordMetadataBuilder<K> builder() {
return new OutgoingKafkaRecordMetadataBuilder<>();
}

public static <K> OutgoingKafkaRecordMetadataBuilder<K> from(OutgoingKafkaRecordMetadata<K> other) {
return new OutgoingKafkaRecordMetadataBuilder<K>()
.withKey(other.getKey())
.withTopic(other.getTopic())
.withHeaders(other.getHeaders())
.withTimestamp(other.getTimestamp())
.withPartition(other.getPartition());
}

protected OutgoingKafkaRecordMetadata(String topic, K key, int partition, Instant timestamp,
Headers headers) {
this.topic = topic;
Expand Down Expand Up @@ -141,7 +151,7 @@ public OutgoingKafkaRecordMetadataBuilder<K> withTimestamp(Instant timestamp) {
}

/**
* Specify headers for Kafka the timestamp for the Kafka record
* Specify headers for the Kafka record
*
* @param headers the headers
* @return this builder
Expand All @@ -151,6 +161,23 @@ public OutgoingKafkaRecordMetadataBuilder<K> withHeaders(Headers headers) {
return this;
}

/**
* Add headers for the Kafka record
*
* @param headers the headers
* @return this builder
*/
public OutgoingKafkaRecordMetadataBuilder<K> addHeaders(RecordHeader... headers) {
if (this.headers == null) {
return withHeaders(Arrays.asList(headers));
} else {
for (Header header : headers) {
this.headers.add(header);
}
}
return this;
}

/**
* Specify the headers for the Kafka record
*
Expand Down
8 changes: 8 additions & 0 deletions smallrye-reactive-messaging-kafka/revapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,14 @@
"code": "java.class.removed",
"old": "class io.smallrye.reactive.messaging.kafka.OutgoingKafkaRecordMetadata<K>",
"justification": "Removed deprecated OutgoingKafkaRecordMetadata"
},
{
"code": "java.annotation.attributeValueChanged",
"old": "class io.smallrye.reactive.messaging.kafka.KafkaConnector",
"new": "class io.smallrye.reactive.messaging.kafka.KafkaConnector",
"annotationType": "io.smallrye.reactive.messaging.annotations.ConnectorAttributes",
"attribute": "value",
"justification": "Added 'assign-seek' attribute for incoming channels"
}
]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@

@ConnectorAttribute(name = "topics", type = "string", direction = Direction.INCOMING, description = "A comma-separating list of topics to be consumed. Cannot be used with the `topic` or `pattern` properties")
@ConnectorAttribute(name = "pattern", type = "boolean", direction = Direction.INCOMING, description = "Indicate that the `topic` property is a regular expression. Must be used with the `topic` property. Cannot be used with the `topics` property", defaultValue = "false")
@ConnectorAttribute(name = "assign-seek", type = "string", direction = Direction.INCOMING, description = "Assign partitions and optionally seek to offsets, instead of subscribing to topics. A comma-separating list of triplets in form of `<topic>:|<partition>|:<offset>` to assign statically to the consumer and seek to the given offsets. Offset `0` seeks to beginning and offset `-1` seeks to the end of the topic-partition. If the topic is omitted the configured topic will be used. If the offset is omitted partitions are assigned to the consumer but won't be seeked to offset.")
@ConnectorAttribute(name = "key.deserializer", type = "string", direction = Direction.INCOMING, description = "The deserializer classname used to deserialize the record's key", defaultValue = "org.apache.kafka.common.serialization.StringDeserializer")
@ConnectorAttribute(name = "value.deserializer", type = "string", direction = Direction.INCOMING, description = "The deserializer classname used to deserialize the record's value", mandatory = true)
@ConnectorAttribute(name = "fetch.min.bytes", type = "int", direction = Direction.INCOMING, description = "The minimum amount of data the server should return for a fetch request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available or the fetch request times out waiting for data to arrive.", defaultValue = "1")
Expand Down
Loading

0 comments on commit c79b8bb

Please sign in to comment.