Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
77 commits
Select commit Hold shift + click to select a range
4ae6814
KAFKA-18993 Remove confusing notable change section from upgrade.html…
frankvicky Mar 15, 2025
16da5a8
MINOR: Bump to 4.0.1-SNAPSHOT (#19224)
dajac Mar 18, 2025
4dd893b
KAFKA-806 Index may not always observe log.index.interval.bytes (#18842)
FrankYang0529 Mar 20, 2025
617c96c
KAFKA-15931: Cancel RemoteLogReader gracefully (#19331)
jeqo Apr 1, 2025
b0b4f42
KAFKA-18067: Add a flag to disable producer reset during active task …
frankvicky Apr 3, 2025
2c48809
KAFKA-18713: Fix FK Left-Join result race condition (#19005)
nilmadhab Apr 3, 2025
71c9d83
KAFKA-16407: Fix foreign key INNER join on change of FK from/to a nul…
AyoubOm Apr 6, 2025
8de7b69
MINOR: small optimization by judgment (#19386)
gongxuanzhang Apr 6, 2025
33fa572
MINOR: remove transform and through from repartition description (#19…
FrankYang0529 Apr 9, 2025
4844bc3
KAFKA-18984: Reset interval.ms By Using kafka-client-metrics.sh (#19213)
Parkerhiphop Mar 24, 2025
4dbe473
KAFKA-18723; Better handle invalid records during replication (#18852)
jsancio Feb 26, 2025
952c8a5
KAFKA-18991: FetcherThread should match leader epochs between fetch r…
frankvicky Mar 25, 2025
83f6a1d
KAFKA-18991; Missing change for cherry-pick
jsancio Apr 9, 2025
de27409
KAFKA-18962: Fix onBatchRestored call in GlobalStateManagerImpl (#19188)
fhussonnois Apr 9, 2025
143fcb1
KAFKA-19071: Fix doc for remote.storage.enable (#19345)
azhar2407 Apr 14, 2025
f98dec9
KAFKA-19147: Start authorizer before group coordinator to ensure coor…
rajinisivaram Apr 16, 2025
8a515da
KAFKA-19054: StreamThread exception handling with SHUTDOWN_APPLICATIO…
apalan60 Apr 17, 2025
56743b3
MINOR: Supress stdout when checking Log4j 1.x configuration compatibi…
omkreddy Apr 17, 2025
3901c85
KAFKA-19166: Fix RC tag in release script (#19518)
mumrah Apr 22, 2025
0297ba2
KAFKA-19192; Old bootstrap checkpoint files cause problems updated se…
cmccabe Apr 24, 2025
0832c2c
KAFKA-19195: Only send the right group ID subset to each GC shard (#1…
lucasbru Apr 28, 2025
c206887
KAFKA-19131: Adjust remote storage reader thread maximum pool size to…
kamalcph May 4, 2025
1f856d4
MINOR: exclude error_prone_annotations lib from caffeine dependency (…
showuon May 6, 2025
cf3c177
KAFKA-19160;KAFKA-19164; Improve performance of fetching stable offse…
squah-confluent May 12, 2025
5015183
KAFKA-19242: Fix commit bugs caused by race condition during rebalanc…
chickenchickenlove May 12, 2025
9aad184
MINOR: Fix version in 4.0 branch (#19686)
dajac May 12, 2025
48f7561
KAFKA-19163: Avoid deleting groups with pending transactional offsets…
squah-confluent May 13, 2025
d64a970
KAFKA-18688: Fix uniform homogeneous assignor stability (#19677)
squah-confluent May 13, 2025
f99db08
KAFKA-19275 client-state and thread-state metrics are always "Unavail…
brandboat May 14, 2025
d7d7876
KAFKA-19274; Group Coordinator Shards are not unloaded when `__consum…
dajac May 15, 2025
62c6697
KAFKA-19208: KStream-GlobalKTable join should not drop left-null-key …
mjsax May 16, 2025
923086d
KAFKA-19171: Kafka Streams crashes with UnsupportedOperationException…
mjsax May 16, 2025
14fd498
MINOR: API Responses missing latest version in Kafka protocol guide (…
andy1li May 20, 2025
3170e11
KAFKA-18345; Prevent livelocked elections (#19658)
ahuang98 May 12, 2025
e9c5069
KAFKA-18687: Setting the subscriptionMetadata during conversion to co…
dongnuo123 May 27, 2025
ded7653
MINOR: Fix some Request toString methods (#19655) (#19689)
ahuang98 May 27, 2025
1cc14f6
KAFKA-19334 MetadataShell execution unintentionally deletes lock file…
ocadaruma Jun 9, 2025
00a1b1e
Bump the commons-beanutils for CVE-2025-48734. Since `commons-validator`
showuon Jun 11, 2025
254c1fa
MINOR: Fixing client telemetry validate request (#19959)
apoorvmittal10 Jun 12, 2025
c6b44b5
Cherry Pick KAFKA-19367 to 4.0 (#19958)
rreddy-22 Jun 14, 2025
26e5a53
HOTFIX: Correcting build after cherry-pick (#19969)
apoorvmittal10 Jun 16, 2025
c8b8adf
KAFKA-19367: Follow up bug fix (#19991)
rreddy-22 Jun 23, 2025
9fcfe54
KAFKA-19407 Fix potential IllegalStateException when appending to tim…
ocadaruma Jun 24, 2025
d426d65
MINOR: fix reassign command bug (#20003)
DL1231 Jun 24, 2025
52a5b88
KAFKA-19411: Fix deleteAcls bug which allows more deletions than max …
ahuang98 Jun 24, 2025
7e51a2a
KAFKA-19294: Fix BrokerLifecycleManager RPC timeouts (#19745)
cmccabe Jun 24, 2025
46e843d
KAFKA-19383: Handle the deleted topics when applying ClearElrRecord (…
CalvinConfluent Jun 25, 2025
15ec053
KAFKA-18656 Backport KAFKA-18597 to 4.0 (#20026)
LoganZhuZzz Jun 25, 2025
6351bc0
MINOR: Fix response for consumer group describe with empty group id (…
rajinisivaram Jun 25, 2025
4ce6f5c
MINOR: Improve ProcessorContext JavaDocs (#20042)
mjsax Jun 26, 2025
45327fd
KAFKA-18035: Backport TransactionsTest testBumpTransactionalEpochWith…
gaurav-narula Jul 4, 2025
8433ac4
KAFKA-19221 Propagate IOException on LogSegment#close (#20072)
gaurav-narula Jul 5, 2025
80b9abe
KAFKA-19444: Add back JoinGroup v0 & v1 (#20116)
ijuma Jul 7, 2025
d95857a
KAFKA-19504: Remove unused metrics reporter initialization in KafkaAd…
bbejeck Jul 15, 2025
eefee6d
KAFKA-19427 Allow the coordinator to grow its buffer dynamically (#20…
mingyen066 Jul 16, 2025
70c5164
Cherrypick "MINOR : Handle error for client telemetry push (#19881)" …
k-raina Jul 16, 2025
12e695e
KAFKA-19520 Bump Commons-Lang for CVE-2025-48924 (#20196)
wernerdv Jul 19, 2025
74d93ad
KAFKA-19501 Update OpenJDK base image from buster to bullseye (#20165)
dalaoqi Jul 16, 2025
b3aeb69
Bump version to 4.0.1
clolov Jul 24, 2025
01d6edc
Bump version to 4.0.1
clolov Jul 24, 2025
d183e52
Merge tag '4.0.1-rc0' into 4.0
clolov Jul 24, 2025
4432d6a
Bump version to 4.0.1
clolov Jul 24, 2025
72fbb37
Merge tag '4.0.1-rc0' into 4.0
clolov Jul 24, 2025
aaf1864
Bump version to 4.0.1
clolov Jul 24, 2025
9c3fffb
Merge tag '4.0.1-rc0' into 4.0
clolov Jul 24, 2025
b81230c
Bump version to 4.0.1
clolov Jul 24, 2025
2a45b4f
Merge tag '4.0.1-rc0' into 4.0
clolov Jul 24, 2025
deb5891
KAFKA-19529: State updater sensor names should be unique (#20262) (#2…
lucasbru Aug 1, 2025
0f9b312
KAFKA-19576 Fix typo in state-change log filename after rotate (#20269)
jaredharley Aug 5, 2025
704a7a4
MINOR: add missing section to TOC (#20305)
mjsax Aug 5, 2025
7b507bd
MINOR: Remove SPAM URL in Streams Documentation (#20321)
rauwuckl Aug 8, 2025
de85722
KAFKA-15307: Kafka Streams configuration docs outdated (#20329)
shashankhs11 Aug 17, 2025
c5169ca
Bump version to 4.0.1
clolov Aug 18, 2025
d3b3aa5
KAFKA-19668: processValue() must be declared as value-changing operat…
mjsax Sep 6, 2025
255c612
KAFKA-19668: update upgrade docs (#20484)
mjsax Sep 8, 2025
b31ce61
Bump version to 4.0.1
clolov Sep 9, 2025
a3035d6
Merge branch 'apache-kafka-4.0.1' into inkless-4.0
jeqo Jan 16, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,4 @@ docker/test/report_*.html
kafka.Kafka
__pycache__

_data/
_data/
7 changes: 3 additions & 4 deletions LICENSE-binary
Original file line number Diff line number Diff line change
Expand Up @@ -206,13 +206,12 @@ This project bundles some components that are also licensed under the Apache
License Version 2.0:

- caffeine-3.1.1
- commons-beanutils-1.9.4
- commons-beanutils-1.11.0
- commons-collections-3.2.2
- commons-digester-2.1
- commons-lang3-3.12.0
- commons-logging-1.3.2
- commons-lang3-3.18.0
- commons-logging-1.3.5
- commons-validator-1.9.0
- error_prone_annotations-2.14.0
- jackson-annotations-2.16.2
- jackson-core-2.16.2
- jackson-databind-2.16.2
Expand Down
2 changes: 1 addition & 1 deletion bin/kafka-run-class.sh
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ if [ -z "$KAFKA_LOG4J_OPTS" ]; then
(( WINDOWS_OS_FORMAT )) && LOG4J_DIR=$(cygpath --path --mixed "${LOG4J_DIR}")
KAFKA_LOG4J_OPTS="-Dlog4j2.configurationFile=${LOG4J_DIR}"
else
if echo "$KAFKA_LOG4J_OPTS" | grep -E "log4j\.[^[:space:]]+(\.properties|\.xml)$"; then
if echo "$KAFKA_LOG4J_OPTS" | grep -E "log4j\.[^[:space:]]+(\.properties|\.xml)$" >/dev/null; then
# Enable Log4j 1.x configuration compatibility mode for Log4j 2
export LOG4J_COMPATIBILITY=true
echo DEPRECATED: A Log4j 1.x configuration file has been detected, which is no longer recommended. >&2
Expand Down
14 changes: 13 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,10 @@ allprojects {
// ensure we have a single version in the classpath despite transitive dependencies
libs.scalaLibrary,
libs.scalaReflect,
libs.jacksonAnnotations
// Workaround before `commons-validator` has new release. See KAFKA-19359.
libs.commonsBeanutils,
libs.jacksonAnnotations,
libs.commonsLang
)
}
}
Expand Down Expand Up @@ -1130,6 +1133,7 @@ project(':core') {
testImplementation project(':test-common:test-common-util')
testImplementation libs.bcpkix
testImplementation libs.mockitoCore
testImplementation libs.jqwik
testImplementation(libs.apacheda) {
exclude group: 'xml-apis', module: 'xml-apis'
// `mina-core` is a transitive dependency for `apacheds` and `apacheda`.
Expand Down Expand Up @@ -1336,6 +1340,12 @@ project(':core') {
)
}

test {
useJUnitPlatform {
includeEngines 'jqwik', 'junit-jupiter'
}
}

tasks.create(name: "copyDependantTestLibs", type: Copy) {
from (configurations.testRuntimeClasspath) {
include('*.jar')
Expand Down Expand Up @@ -1903,6 +1913,7 @@ project(':clients') {
testImplementation libs.jacksonJakartarsJsonProvider
testImplementation libs.jose4j
testImplementation libs.junitJupiter
testImplementation libs.jqwik
testImplementation libs.spotbugs
testImplementation libs.mockitoCore
testImplementation libs.mockitoJunitJupiter // supports MockitoExtension
Expand Down Expand Up @@ -2291,6 +2302,7 @@ project(':storage') {
implementation project(':transaction-coordinator')
implementation(libs.caffeine) {
exclude group: 'org.checkerframework', module: 'checker-qual'
exclude group: 'com.google.errorprone', module: 'error_prone_annotations'
}
implementation libs.slf4jApi
implementation libs.jacksonDatabind
Expand Down
2 changes: 2 additions & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,8 @@
<allow pkg="io.opentelemetry.proto"/>
<!-- for testing -->
<allow pkg="org.apache.kafka.common.telemetry" />
<!-- for IncrementalAlterConfigsRequest and AlterUserScramCredentialsRequest -->
<allow pkg="com.fasterxml.jackson.databind" />
</subpackage>

<subpackage name="serialization">
Expand Down
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@
files="(AbstractRequest|AbstractResponse|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest|KafkaAdminClientTest).java"/>

<suppress checks="NPathComplexity"
files="(AbstractMembershipManager|ConsumerCoordinator|BufferPool|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|PluginUtils|MiniTrogdorCluster|TasksRequest|KafkaProducer|AbstractStickyAssignor|Authorizer|FetchSessionHandler|RecordAccumulator|Shell).java"/>
files="(AbstractMembershipManager|ConsumerCoordinator|BufferPool|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|PluginUtils|MiniTrogdorCluster|TasksRequest|KafkaProducer|AbstractStickyAssignor|Authorizer|FetchSessionHandler|RecordAccumulator|Shell|MockConsumer).java"/>

<suppress checks="(JavaNCSS|CyclomaticComplexity|MethodLength)"
files="CoordinatorClient.java"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -577,10 +577,12 @@ static KafkaAdminClient createInternal(AdminClientConfig config,
Time time) {
Metrics metrics = null;
String clientId = generateClientId(config);
List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters(clientId, config);
Optional<ClientTelemetryReporter> clientTelemetryReporter = CommonClientConfigs.telemetryReporter(clientId, config);
clientTelemetryReporter.ifPresent(reporters::add);

try {
metrics = new Metrics(new MetricConfig(), new LinkedList<>(), time);
metrics = new Metrics(new MetricConfig(), reporters, time);
LogContext logContext = createLogContext(clientId);
return new KafkaAdminClient(config, clientId, time, metadataManager, metrics,
client, null, logContext, clientTelemetryReporter);
Expand Down Expand Up @@ -625,9 +627,7 @@ private KafkaAdminClient(AdminClientConfig config,
CommonClientConfigs.RETRY_BACKOFF_EXP_BASE,
retryBackoffMaxMs,
CommonClientConfigs.RETRY_BACKOFF_JITTER);
List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters(this.clientId, config);
this.clientTelemetryReporter = clientTelemetryReporter;
this.clientTelemetryReporter.ifPresent(reporters::add);
this.metadataRecoveryStrategy = MetadataRecoveryStrategy.forName(config.getString(AdminClientConfig.METADATA_RECOVERY_STRATEGY_CONFIG));
this.partitionLeaderCache = new HashMap<>();
this.adminFetchMetricsManager = new AdminFetchMetricsManager(metrics);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -79,6 +80,8 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
private Uuid clientInstanceId;
private int injectTimeoutExceptionCounter;

private long maxPollRecords = Long.MAX_VALUE;

private final List<KafkaMetric> addedMetrics = new ArrayList<>();

/**
Expand Down Expand Up @@ -275,14 +278,22 @@ public synchronized ConsumerRecords<K, V> poll(final Duration timeout) {
// update the consumed offset
final Map<TopicPartition, List<ConsumerRecord<K, V>>> results = new HashMap<>();
final Map<TopicPartition, OffsetAndMetadata> nextOffsetAndMetadata = new HashMap<>();
final List<TopicPartition> toClear = new ArrayList<>();
long numPollRecords = 0L;

final Iterator<Map.Entry<TopicPartition, List<ConsumerRecord<K, V>>>> partitionsIter = this.records.entrySet().iterator();
while (partitionsIter.hasNext() && numPollRecords < this.maxPollRecords) {
Map.Entry<TopicPartition, List<ConsumerRecord<K, V>>> entry = partitionsIter.next();

for (Map.Entry<TopicPartition, List<ConsumerRecord<K, V>>> entry : this.records.entrySet()) {
if (!subscriptions.isPaused(entry.getKey())) {
final List<ConsumerRecord<K, V>> recs = entry.getValue();
for (final ConsumerRecord<K, V> rec : recs) {
final Iterator<ConsumerRecord<K, V>> recIterator = entry.getValue().iterator();
while (recIterator.hasNext()) {
if (numPollRecords >= this.maxPollRecords) {
break;
}
long position = subscriptions.position(entry.getKey()).offset;

final ConsumerRecord<K, V> rec = recIterator.next();

if (beginningOffsets.get(entry.getKey()) != null && beginningOffsets.get(entry.getKey()) > position) {
throw new OffsetOutOfRangeException(Collections.singletonMap(entry.getKey(), position));
}
Expand All @@ -294,13 +305,17 @@ public synchronized ConsumerRecords<K, V> poll(final Duration timeout) {
rec.offset() + 1, rec.leaderEpoch(), leaderAndEpoch);
subscriptions.position(entry.getKey(), newPosition);
nextOffsetAndMetadata.put(entry.getKey(), new OffsetAndMetadata(rec.offset() + 1, rec.leaderEpoch(), ""));
numPollRecords++;
recIterator.remove();
}
}
toClear.add(entry.getKey());

if (entry.getValue().isEmpty()) {
partitionsIter.remove();
}
}
}

toClear.forEach(records::remove);
return new ConsumerRecords<>(results, nextOffsetAndMetadata);
}

Expand All @@ -314,6 +329,18 @@ public synchronized void addRecord(ConsumerRecord<K, V> record) {
recs.add(record);
}

/**
* Sets the maximum number of records returned in a single call to {@link #poll(Duration)}.
*
* @param maxPollRecords the max.poll.records.
*/
public synchronized void setMaxPollRecords(long maxPollRecords) {
if (this.maxPollRecords < 1) {
throw new IllegalArgumentException("MaxPollRecords must be strictly superior to 0");
}
this.maxPollRecords = maxPollRecords;
}

public synchronized void setPollException(KafkaException exception) {
this.pollException = exception;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1263,23 +1263,25 @@ RequestFuture<Void> sendOffsetCommitRequest(final Map<TopicPartition, OffsetAndM
final Generation generation;
final String groupInstanceId;
if (subscriptions.hasAutoAssignedPartitions()) {
generation = generationIfStable();
groupInstanceId = rebalanceConfig.groupInstanceId.orElse(null);
// if the generation is null, we are not part of an active group (and we expect to be).
// the only thing we can do is fail the commit and let the user rejoin the group in poll().
if (generation == null) {
log.info("Failing OffsetCommit request since the consumer is not part of an active group");

if (rebalanceInProgress()) {
// if the client knows it is already rebalancing, we can use RebalanceInProgressException instead of
// CommitFailedException to indicate this is not a fatal error
return RequestFuture.failure(new RebalanceInProgressException("Offset commit cannot be completed since the " +
"consumer is undergoing a rebalance for auto partition assignment. You can try completing the rebalance " +
"by calling poll() and then retry the operation."));
} else {
return RequestFuture.failure(new CommitFailedException("Offset commit cannot be completed since the " +
"consumer is not part of an active group for auto partition assignment; it is likely that the consumer " +
"was kicked out of the group."));
synchronized (ConsumerCoordinator.this) {
generation = generationIfStable();
groupInstanceId = rebalanceConfig.groupInstanceId.orElse(null);
// if the generation is null, we are not part of an active group (and we expect to be).
// the only thing we can do is fail the commit and let the user rejoin the group in poll().
if (generation == null) {
log.info("Failing OffsetCommit request since the consumer is not part of an active group");

if (rebalanceInProgress()) {
// if the client knows it is already rebalancing, we can use RebalanceInProgressException instead of
// CommitFailedException to indicate this is not a fatal error
return RequestFuture.failure(new RebalanceInProgressException("Offset commit cannot be completed since the " +
"consumer is undergoing a rebalance for auto partition assignment. You can try completing the rebalance " +
"by calling poll() and then retry the operation."));
} else {
return RequestFuture.failure(new CommitFailedException("Offset commit cannot be completed since the " +
"consumer is not part of an active group for auto partition assignment; it is likely that the consumer " +
"was kicked out of the group."));
}
}
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,9 @@ public class TopicConfig {
"Moreover, it triggers the rolling of new segment if the retention.ms condition is satisfied.";

public static final String REMOTE_LOG_STORAGE_ENABLE_CONFIG = "remote.storage.enable";
public static final String REMOTE_LOG_STORAGE_ENABLE_DOC = "To enable tiered storage for a topic, set this configuration as true. " +
"You can not disable this config once it is enabled. It will be provided in future versions.";
public static final String REMOTE_LOG_STORAGE_ENABLE_DOC = "To enable tiered storage for a topic, set this configuration to true. " +
"To disable tiered storage for a topic that has it enabled, set this configuration to false. " +
"When disabling, you must also set <code>remote.log.delete.on.disable</code> to true.";

public static final String LOCAL_LOG_RETENTION_MS_CONFIG = "local.retention.ms";
public static final String LOCAL_LOG_RETENTION_MS_DOC = "The number of milliseconds to keep the local log segment before it gets deleted. " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ public static String toHtml() {
// Responses
b.append("<b>Responses:</b><br>\n");
Schema[] responses = key.messageType.responseSchemas();
for (int version = key.oldestVersion(); version < key.latestVersion(); version++) {
for (int version = key.oldestVersion(); version <= key.latestVersion(); version++) {
Schema schema = responses[version];
if (schema == null)
throw new IllegalStateException("Unexpected null schema for " + key + " with version " + version);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public void ensureValid() {

/**
* Gets the base timestamp of the batch which is used to calculate the record timestamps from the deltas.
*
*
* @return The base timestamp
*/
public long baseTimestamp() {
Expand Down Expand Up @@ -502,6 +502,7 @@ public static void writeHeader(ByteBuffer buffer,
public String toString() {
return "RecordBatch(magic=" + magic() + ", offsets=[" + baseOffset() + ", " + lastOffset() + "], " +
"sequence=[" + baseSequence() + ", " + lastSequence() + "], " +
"partitionLeaderEpoch=" + partitionLeaderEpoch() + ", " +
"isTransactional=" + isTransactional() + ", isControlBatch=" + isControlBatch() + ", " +
"compression=" + compressionType() + ", timestampType=" + timestampType() + ", crc=" + checksum() + ")";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,10 @@ public void flush() throws IOException {
* Close this record set
*/
public void close() throws IOException {
if (!channel.isOpen()) {
return;
}

flush();
trim();
channel.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,6 @@
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.GatheringByteChannel;
Expand All @@ -50,7 +47,6 @@
* or one of the {@link #builder(ByteBuffer, byte, Compression, TimestampType, long)} variants.
*/
public class MemoryRecords extends AbstractRecords {
private static final Logger log = LoggerFactory.getLogger(MemoryRecords.class);
public static final MemoryRecords EMPTY = MemoryRecords.readableRecords(ByteBuffer.allocate(0));

private final ByteBuffer buffer;
Expand Down Expand Up @@ -602,7 +598,7 @@ public static MemoryRecords withRecords(byte magic, long initialOffset, Compress
return withRecords(magic, initialOffset, compression, TimestampType.CREATE_TIME, records);
}

public static MemoryRecords withRecords(long initialOffset, Compression compression, Integer partitionLeaderEpoch, SimpleRecord... records) {
public static MemoryRecords withRecords(long initialOffset, Compression compression, int partitionLeaderEpoch, SimpleRecord... records) {
return withRecords(RecordBatch.CURRENT_MAGIC_VALUE, initialOffset, compression, TimestampType.CREATE_TIME, RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, partitionLeaderEpoch, false, records);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public String toString(boolean verbose) {
}

@Override
public final String toString() {
public String toString() {
return toString(true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@
package org.apache.kafka.common.requests;

import org.apache.kafka.common.message.AlterUserScramCredentialsRequestData;
import org.apache.kafka.common.message.AlterUserScramCredentialsRequestDataJsonConverter;
import org.apache.kafka.common.message.AlterUserScramCredentialsResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;

import java.nio.ByteBuffer;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -82,4 +86,16 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
.collect(Collectors.toList());
return new AlterUserScramCredentialsResponse(new AlterUserScramCredentialsResponseData().setResults(results));
}

// Do not print salt or saltedPassword
@Override
public String toString() {
JsonNode json = AlterUserScramCredentialsRequestDataJsonConverter.write(data, version()).deepCopy();

for (JsonNode upsertion : json.get("upsertions")) {
((ObjectNode) upsertion).put("salt", "");
((ObjectNode) upsertion).put("saltedPassword", "");
}
return AlterUserScramCredentialsRequestDataJsonConverter.read(json, version()).toString();
}
}
Loading