Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ List<FindBatchResponse> findBatches(

void createTopicAndPartitions(Set<CreateTopicAndPartitionsRequest> requests);

void initDisklessLog(Set<InitDisklessLogRequest> requests);

List<DeleteRecordsResponse> deleteRecords(List<DeleteRecordsRequest> requests);

void deleteTopics(Set<Uuid> topicIds);
Expand Down Expand Up @@ -94,6 +96,8 @@ static ControlPlane create(final InklessConfig config, final Time time) {

boolean isSafeToDeleteFile(String objectKeyPath);

List<GetDisklessLogResponse> getDisklessLog(List<GetDisklessLogRequest> requests);

// used for testing purposes only
List<GetLogInfoResponse> getLogInfo(List<GetLogInfoRequest> requests);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Inkless
* Copyright (C) 2026 Aiven OY
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package io.aiven.inkless.control_plane;

import org.apache.kafka.common.Uuid;

/**
* Exception thrown when attempting to initialize a diskless log that has already been initialized.
*/
public class DisklessLogAlreadyInitializedException extends ControlPlaneException {

private final Uuid topicId;
private final int partition;

public DisklessLogAlreadyInitializedException(final Uuid topicId, final int partition) {
super(String.format("Diskless log already initialized for topic %s partition %d",
topicId, partition));
this.topicId = topicId;
this.partition = partition;
}

public Uuid topicId() {
return topicId;
}

public int partition() {
return partition;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Inkless
* Copyright (C) 2026 Aiven OY
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package io.aiven.inkless.control_plane;

import org.apache.kafka.common.Uuid;

public record GetDisklessLogRequest(Uuid topicId,
int partition) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Inkless
* Copyright (C) 2026 Aiven OY
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package io.aiven.inkless.control_plane;

import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.protocol.Errors;

public record GetDisklessLogResponse(Uuid topicId,
int partition,
Errors error,
long logStartOffset,
long highWatermark,
Long disklessStartOffset) {
public static final long INVALID_OFFSET = -1L;

public static GetDisklessLogResponse success(final Uuid topicId,
final int partition,
final long logStartOffset,
final long highWatermark,
final Long disklessStartOffset) {
return new GetDisklessLogResponse(topicId, partition, Errors.NONE, logStartOffset, highWatermark, disklessStartOffset);
}

public static GetDisklessLogResponse unknownTopicOrPartition(final Uuid topicId, final int partition) {
return new GetDisklessLogResponse(topicId, partition, Errors.UNKNOWN_TOPIC_OR_PARTITION, INVALID_OFFSET, INVALID_OFFSET, INVALID_OFFSET);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,70 @@ public synchronized void createTopicAndPartitions(final Set<CreateTopicAndPartit
}
}

@Override
public synchronized void initDisklessLog(final Set<InitDisklessLogRequest> requests) {
for (final InitDisklessLogRequest request : requests) {
final TopicIdPartition topicIdPartition = new TopicIdPartition(
request.topicId(), request.partition(), request.topicName());

final LogInfo existingLogInfo = logs.get(topicIdPartition);
if (existingLogInfo != null) {
// Check if leader epoch is stale
if (request.leaderEpoch() < existingLogInfo.leaderEpochAtInit) {
throw new StaleLeaderEpochException(
request.topicId(), request.partition(), request.leaderEpoch());
}

// Check for invalid state: disklessStartOffset should never exceed highWatermark
if (existingLogInfo.disklessStartOffset > existingLogInfo.highWatermark) {
throw new IllegalStateException(String.format(
"Invalid state for %s: disklessStartOffset (%d) > highWatermark (%d)",
topicIdPartition, existingLogInfo.disklessStartOffset, existingLogInfo.highWatermark));
}

// Check if messages have been appended (no longer in migration phase)
if (existingLogInfo.disklessStartOffset < existingLogInfo.highWatermark) {
throw new DisklessLogAlreadyInitializedException(
request.topicId(), request.partition());
}

// Still in migration phase with valid epoch - update existing log
LOGGER.info("Updating {} with logStartOffset {}, disklessStartOffset {}, leaderEpoch {}",
topicIdPartition, request.logStartOffset(), request.disklessStartOffset(), request.leaderEpoch());
existingLogInfo.logStartOffset = request.logStartOffset();
existingLogInfo.highWatermark = request.disklessStartOffset();
existingLogInfo.disklessStartOffset = request.disklessStartOffset();
existingLogInfo.leaderEpochAtInit = request.leaderEpoch();

// Clear existing producer state for this partition
producers.remove(topicIdPartition);
} else {
// Create new log entry
LOGGER.info("Initializing {} with logStartOffset {}, disklessStartOffset {}, leaderEpoch {}",
topicIdPartition, request.logStartOffset(), request.disklessStartOffset(), request.leaderEpoch());
final LogInfo logInfo = new LogInfo();
logInfo.logStartOffset = request.logStartOffset();
logInfo.highWatermark = request.disklessStartOffset();
logInfo.disklessStartOffset = request.disklessStartOffset();
logInfo.leaderEpochAtInit = request.leaderEpoch();
logs.put(topicIdPartition, logInfo);
batches.putIfAbsent(topicIdPartition, new TreeMap<>());
}

// Insert producer state entries
if (request.producerStateEntries() != null && !request.producerStateEntries().isEmpty()) {
final TreeMap<Long, LatestProducerState> partitionProducers =
producers.computeIfAbsent(topicIdPartition, k -> new TreeMap<>());
for (final ProducerStateSnapshot entry : request.producerStateEntries()) {
final LatestProducerState producerState = partitionProducers
.computeIfAbsent(entry.producerId(), k -> LatestProducerState.empty(entry.producerEpoch()));
producerState.addElement(entry.baseSequence(), entry.lastSequence(),
entry.assignedOffset(), entry.batchMaxTimestamp());
}
}
}
}

@Override
protected synchronized Iterator<CommitBatchResponse> commitFileForValidRequests(
final String objectKey,
Expand Down Expand Up @@ -644,6 +708,27 @@ public boolean isSafeToDeleteFile(String objectKeyPath) {
return !files.containsKey(objectKeyPath);
}

@Override
public synchronized List<GetDisklessLogResponse> getDisklessLog(final List<GetDisklessLogRequest> requests) {
final List<GetDisklessLogResponse> result = new ArrayList<>();
for (final GetDisklessLogRequest request : requests) {
final TopicIdPartition tidp = findTopicIdPartition(request.topicId(), request.partition());
final LogInfo logInfo;
if (tidp == null || (logInfo = logs.get(tidp)) == null) {
result.add(GetDisklessLogResponse.unknownTopicOrPartition(request.topicId(), request.partition()));
} else {
result.add(GetDisklessLogResponse.success(
request.topicId(),
request.partition(),
logInfo.logStartOffset,
logInfo.highWatermark,
logInfo.disklessStartOffset
));
}
}
return result;
}

@Override
public synchronized List<GetLogInfoResponse> getLogInfo(final List<GetLogInfoRequest> requests) {
final List<GetLogInfoResponse> result = new ArrayList<>();
Expand Down Expand Up @@ -679,6 +764,8 @@ private static class LogInfo {
long logStartOffset = 0;
long highWatermark = 0;
long byteSize = 0;
Long disklessStartOffset = null;
int leaderEpochAtInit = 0;
}

private static class FileInfo {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Inkless
* Copyright (C) 2026 Aiven OY
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package io.aiven.inkless.control_plane;

import org.apache.kafka.common.Uuid;
import org.apache.kafka.storage.internals.log.BatchMetadata;
import org.apache.kafka.storage.internals.log.ProducerStateEntry;
import org.apache.kafka.storage.internals.log.ProducerStateManager;
import org.apache.kafka.storage.internals.log.UnifiedLog;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
* Request to initialize the diskless start offset for a partition.
*
* @param topicId The topic ID
* @param topicName The topic name
* @param partition The partition number
* @param logStartOffset The log start offset
* @param disklessStartOffset The diskless start offset (same as high watermark when transitioning)
* @param leaderEpoch The leader epoch of the broker making the request. Used to reject stale requests.
* @param producerStateEntries The producer state entries to store during initialization
*/
public record InitDisklessLogRequest(Uuid topicId,
String topicName,
int partition,
long logStartOffset,
long disklessStartOffset,
int leaderEpoch,
List<ProducerStateSnapshot> producerStateEntries) {

/**
* Creates an InitDisklessLogRequest from a UnifiedLog.
* @param log The UnifiedLog to extract information from
* @param leaderEpoch The current leader epoch for this partition
* @throws IllegalStateException if the log does not have a topic ID
*/
public static InitDisklessLogRequest fromUnifiedLog(final UnifiedLog log, final int leaderEpoch) {
final Uuid topicId = log.topicId()
.orElseThrow(() -> new IllegalStateException("Topic ID is required for diskless initialization"));

final String topicName = log.topicPartition().topic();
final int partition = log.topicPartition().partition();
final long logStartOffset = log.logStartOffset();
final long highWatermark = log.highWatermark();

// Extract producer state entries
final List<ProducerStateSnapshot> producerStateEntries = extractProducerState(log.producerStateManager());

return new InitDisklessLogRequest(
topicId,
topicName,
partition,
logStartOffset,
highWatermark,
leaderEpoch,
producerStateEntries
);
}

/**
* Extracts the producer state from the ProducerStateManager.
* <p>
* For each active producer, this method extracts all retained batch metadata entries
* (up to {@link ProducerStateEntry#NUM_BATCHES_TO_RETAIN} per producer) to support
* duplicate detection after the transition to diskless.
*
* @return A list of ProducerStateSnapshot entries for all active producers
*/
private static List<ProducerStateSnapshot> extractProducerState(final ProducerStateManager producerStateManager) {
final Map<Long, ProducerStateEntry> activeProducers = producerStateManager.activeProducers();
final List<ProducerStateSnapshot> snapshots = new ArrayList<>();

for (final Map.Entry<Long, ProducerStateEntry> entry : activeProducers.entrySet()) {
final long producerId = entry.getKey();
final ProducerStateEntry state = entry.getValue();

for (final BatchMetadata batch : state.batchMetadata()) {
snapshots.add(new ProducerStateSnapshot(
producerId,
state.producerEpoch(),
batch.firstSeq(),
batch.lastSeq,
batch.firstOffset(), // assigned offset is the first offset of the batch
batch.timestamp
));
}
}

return snapshots;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Inkless
* Copyright (C) 2026 Aiven OY
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package io.aiven.inkless.control_plane;

/**
* Represents a single producer state entry to be stored in the control plane.
* This is used during the initialization of diskless log to transfer producer state
* from the local log to the control plane.
*
* @param producerId The producer ID
* @param producerEpoch The producer epoch
* @param baseSequence The base sequence number of the batch
* @param lastSequence The last sequence number of the batch
* @param assignedOffset The offset assigned to this batch
* @param batchMaxTimestamp The maximum timestamp in the batch
*/
public record ProducerStateSnapshot(
long producerId,
short producerEpoch,
int baseSequence,
int lastSequence,
long assignedOffset,
long batchMaxTimestamp
) {
}
Loading
Loading