Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support sending standardized uuid and priority values #92

Merged
merged 3 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,33 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [1.0.0] - 2024-10-07

### Changed

Added two new properties to the `TkmsMessage`:

- `uuid` [required] - specifies uniqueness of the message. Will be delivered to consumers in the `x-wise-uuid` header.
- `priority` [optional] - specifies priority of the message. Lower number - higher priority. Will be delivered to consumers in the `x-wise-uuid` header.

Consumers of messages that have UUID and priority headers can efficiently use provided values for deduplication and other processing purposes with no need to deserialize payloads.

Best practices for setting UUID value:
- Likely the UUID value provided will be stored and indexed on consumer side. It's recommended to use sequential UUIDs in such scenarios, which proved to yield better performance. One way to generate sequential UUIDs is by using [tw-base-utils](https://github.com/transferwise/tw-base-utils/blob/master/tw-base-utils/src/main/java/com/transferwise/common/baseutils/UuidUtils.java#L37) library.
- If payload already has UUID value then assign the same value to the corresponding `TkmsMessage`. It ensures that consumers of such messages can consistently deduplicate them by depending on one of those UUIDs. It simplifies consumers migration to standard header based UUID deduplication.
- If custom message identification mechanism is used (not based on UUID), still generate and assign UUID to the messages. However, be mindful of cases when messages are sent in non-transactional environments. For example, the same message might be sent twice with different UUIDs but the same identity (according to the custom identification mechanism).

<b>UUID presence is required by default.</b> This is the only breaking change in this version. The intention of such behaviour is to force producers to supply UUID value in messages sent, which will greatly benefit consumers of those messages. This requirement can be relaxed by setting the following configuration property.
```yaml
uuid-header-required: false
```

Added a new gauge metric that exposes value of `uuid-header-required` configuration property.
```
# 0 - uuid header isn't required, 1 - uuid header is required
tw_tkms_configuration_uuid_header_required
```

## [0.30.1] - 2024-08-08
### Changed
- MeterFilter's applied by the library are no longer explicitly applied and are instead
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.transferwise.kafka.tkms.demoapp;

import com.transferwise.common.baseutils.ExceptionUtils;
import com.transferwise.common.baseutils.UuidUtils;
import com.transferwise.kafka.tkms.api.ITransactionalKafkaMessageSender;
import com.transferwise.kafka.tkms.api.TkmsMessage;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -50,6 +51,8 @@ public void produce(long threadCount, long batchCount, long batchSize) {
String key = String.valueOf(finalT * batchCount * batchSize + finalI * batchSize + j);

TkmsMessage message = new TkmsMessage()
.setUuid(UuidUtils.generatePrefixCombUuid())
.setPriority(17L)
.setTopic("MyTopic")
.setTimestamp(Instant.now())
.setKey(key).setValue(textMessage.getBytes(StandardCharsets.UTF_8));
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.30.1
version=1.0.0
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.transferwise.kafka.tkms;

/**
* Set of standard headers used by the library.
*/
final class StandardHeaders {

static final String X_WISE_UUID = "x-wise-uuid";
static final String X_WISE_PRIORITY = "x-wise-priority";
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.transferwise.kafka.tkms.config.TkmsProperties.NotificationLevel;
import com.transferwise.kafka.tkms.config.TkmsProperties.NotificationType;
import com.transferwise.kafka.tkms.metrics.ITkmsMetricsTemplate;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
Expand Down Expand Up @@ -148,7 +149,11 @@ protected void checkActiveTransaction(int shard, boolean transactionActive, bool
@Override
public SendMessagesResult sendMessages(SendMessagesRequest request) {
request.getTkmsMessages().forEach(message -> messageDecorators.forEach(message::accept));
validateMessages(request);
for (int i = 0; i < request.getTkmsMessages().size(); i++) {
TkmsMessage tkmsMessage = request.getTkmsMessages().get(i);
addStandardHeaders(tkmsMessage);
validateMessage(tkmsMessage, i);
}

var transactionActive = TransactionSynchronizationManager.isActualTransactionActive();
var validatedTopics = new HashSet<String>();
Expand Down Expand Up @@ -263,6 +268,7 @@ public SendMessageResult sendMessage(SendMessageRequest request) {

checkActiveTransaction(shardPartition.getShard(), transactionActive, deferMessageRegistrationUntilCommit);

addStandardHeaders(request.getTkmsMessage());
validateMessage(message, 0);
validateMessageSize(message, 0);

Expand Down Expand Up @@ -374,13 +380,6 @@ public void afterCompletion(int status) {
}
}

protected void validateMessages(SendMessagesRequest request) {
for (int i = 0; i < request.getTkmsMessages().size(); i++) {
var tkmsMessage = request.getTkmsMessages().get(i);
validateMessage(tkmsMessage, i);
}
}

protected void validateMessage(TkmsMessage message, int messageIdx) {
Preconditions.checkNotNull(message, "%s: No message provided.", messageIdx);
Preconditions.checkArgument(!Strings.isNullOrEmpty(message.getTopic()), "%s: No topic provided.", messageIdx);
Expand All @@ -396,13 +395,23 @@ protected void validateMessage(TkmsMessage message, int messageIdx) {
message.getShard(), properties.getShardsCount());
}
Preconditions.checkNotNull(message.getValue(), "%s: Value can not be null.", messageIdx);
boolean uuidHeaderPresent = false;
if (message.getHeaders() != null) {
for (int headerIdx = 0; headerIdx < message.getHeaders().size(); headerIdx++) {
Header header = message.getHeaders().get(headerIdx);
Preconditions.checkNotNull(header.getValue(), "%s: Header value @{%s} can not be null.", messageIdx, headerIdx);
Preconditions.checkArgument(!Strings.isNullOrEmpty(header.getKey()), "%s: Header key @{%s} can not be null.", messageIdx, headerIdx);
uuidHeaderPresent |= StandardHeaders.X_WISE_UUID.equals(header.getKey());
}
}
if (properties.isUuidHeaderRequired() && !uuidHeaderPresent) {
throw new IllegalArgumentException(
yevhenii0 marked this conversation as resolved.
Show resolved Hide resolved
"%d: Message is required to have @{%s} header.".formatted(
messageIdx,
StandardHeaders.X_WISE_UUID
)
);
}
}

/**
Expand Down Expand Up @@ -434,6 +443,24 @@ protected void validateMessageSize(TkmsMessage message, int messageIdx) {
}
}

private static void addStandardHeaders(TkmsMessage tkmsMessage) {
if (tkmsMessage.getPriority() != null) {
tkmsMessage.addHeader(
new Header()
.setKey(StandardHeaders.X_WISE_PRIORITY)
.setValue(tkmsMessage.getPriority().toString().getBytes(StandardCharsets.UTF_8))
);
}
// uuid shall remain last header, so it can be quickly accessed using Headers#lastHeader
if (tkmsMessage.getUuid() != null) {
tkmsMessage.addHeader(
new Header()
.setKey(StandardHeaders.X_WISE_UUID)
.setValue(tkmsMessage.getUuid().toString().getBytes(StandardCharsets.UTF_8))
);
}
}

private int utf8Length(CharSequence s) {
if (s == null) {
return 0;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package com.transferwise.kafka.tkms.api;

import com.transferwise.common.baseutils.UuidUtils;
import com.transferwise.kafka.tkms.CompressionAlgorithm;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
import javax.validation.constraints.PositiveOrZero;
Expand Down Expand Up @@ -71,6 +73,31 @@ public class TkmsMessage {
*/
private Map<?, ?> metadata;

/**
* Uniquely identifies this message for consumers.
*
* <p>If value is set then it will be added to headers with {@code x-wise-uuid} key.
*
* <p>Having UUID in header allows consumer to run deduplication check on this value without need to deserialize payload.
* If payload provides uuid it must be the same as this value so that consumers that depend on either of these values can have consistent
* deduplication.
*
* <p>Prefer using sequential uuids (e.g. {@link UuidUtils#generatePrefixCombUuid()}) which are proved to yield better performance.
*/
private UUID uuid;

/**
* Defines priority of this message for consumers.
*
* <p>Lower value means higher priority. For example, 0 is higher priority than 10.
*
* <p>If value is set then it will be added to headers with {@code x-wise-priority} key.
*
* <p>Having priority in header allows consumer to derive priority without need to deserialize payload. For example, it can be useful
* when consumers filter messages based on priority before deciding how to process those.
*/
private Long priority;

public TkmsMessage addHeader(Header header) {
if (headers == null) {
headers = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.transferwise.common.baseutils.validation.LegacyResolvedValue;
import com.transferwise.common.baseutils.validation.ResolvedValue;
import com.transferwise.kafka.tkms.CompressionAlgorithm;
import com.transferwise.kafka.tkms.api.TkmsMessage;
import com.transferwise.kafka.tkms.api.TkmsShardPartition;
import java.time.Duration;
import java.util.ArrayList;
Expand Down Expand Up @@ -238,6 +239,11 @@ public void afterPropertiesSet() {
*/
private boolean deferMessageRegistrationUntilCommit = false;

/**
* Whether every message sent is required to have {@code x-wise-uuid} header. See {@link TkmsMessage#getUuid()} for more details.
*/
private boolean uuidHeaderRequired = true;
yevhenii0 marked this conversation as resolved.
Show resolved Hide resolved

@Valid
@jakarta.validation.Valid
private Compression compression = new Compression();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,9 @@
import com.transferwise.kafka.tkms.config.TkmsProperties;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.Meter.Type;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.config.MeterFilter;
import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;
import java.time.Instant;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import lombok.Data;
Expand All @@ -37,6 +31,7 @@ public class TkmsMetricsTemplate implements ITkmsMetricsTemplate, InitializingBe
// Miccrometer 1.13 (which comes with Spring boot 3.3) doesn't properly convert gauge metrics with info suffix when using underscore,
// using dot here as a workaround
public static final String GAUGE_LIBRARY_INFO = "tw.library.info";
public static final String GAUGE_CONFIGURATION_UUID_HEADER_REQUIRED = "tw_tkms_configuration_uuid_header_required";
public static final String TIMER_PROXY_POLL = "tw_tkms_proxy_poll";
public static final String GAUGE_PROXY_POLL_IN_PROGRESS = "tw_tkms_proxy_poll_in_progress";
public static final String TIMER_PROXY_CYCLE = "tw_tkms_proxy_cycle";
Expand Down Expand Up @@ -288,6 +283,9 @@ public void registerLibrary() {
Gauge.builder(GAUGE_LIBRARY_INFO, () -> 1d).tags("version", version, "library", "tw-tkms")
.description("Provides metadata about the library, for example the version.")
.register(meterCache.getMeterRegistry());
Gauge.builder(GAUGE_CONFIGURATION_UUID_HEADER_REQUIRED, tkmsProperties, props -> props.isUuidHeaderRequired() ? 1d : 0d)
.description("0 - uuid header isn't required, 1 - uuid header is required")
.register(meterCache.getMeterRegistry());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import com.transferwise.common.baseutils.UuidUtils;
import com.transferwise.common.baseutils.clock.TestClock;
import com.transferwise.common.baseutils.transactionsmanagement.ITransactionsHelper;
import com.transferwise.kafka.tkms.api.TkmsMessage;
Expand Down Expand Up @@ -106,7 +107,12 @@ void testIfEarliestMessageTrackerBehavesAsExpected() {

protected void sendMessageAndWaitForArrival() {
transactionsHelper.withTransaction().run(() -> {
var result = tkms.sendMessage(new TkmsMessage().setTopic(testTopic).setValue("Hello Kristo!".getBytes(StandardCharsets.UTF_8)));
var result = tkms.sendMessage(
new TkmsMessage()
.setUuid(UuidUtils.generatePrefixCombUuid())
.setTopic(testTopic)
.setValue("Hello Kristo!".getBytes(StandardCharsets.UTF_8))
);
log.info("Registered a message with storage id " + result.getStorageId());
}
);
Expand Down
Loading
Loading