From 1329854579448664bcd83c8a229cfbb7c3627086 Mon Sep 17 00:00:00 2001 From: Giuseppe Lillo Date: Thu, 15 Jan 2026 15:58:15 +0100 Subject: [PATCH] Control Plane support for initializing the diskless log Introduces control plane support for initializing diskless logs, enabling migrating topics from classic to diskless. The initialization transfers log offsets and producer state. --- .../inkless/control_plane/ControlPlane.java | 4 + ...isklessLogAlreadyInitializedException.java | 44 ++++ .../control_plane/GetDisklessLogRequest.java | 24 ++ .../control_plane/GetDisklessLogResponse.java | 42 ++++ .../control_plane/InMemoryControlPlane.java | 87 +++++++ .../control_plane/InitDisklessLogRequest.java | 109 +++++++++ .../control_plane/ProducerStateSnapshot.java | 40 +++ .../StaleLeaderEpochException.java | 51 ++++ .../postgres/GetDisklessLogJob.java | 117 +++++++++ .../postgres/InitDisklessLogJob.java | 141 +++++++++++ .../postgres/PostgresControlPlane.java | 23 ++ .../postgres/PostgresControlPlaneMetrics.java | 12 + .../TopicsAndPartitionsCreateJob.java | 5 +- .../V11__Add_diskless_offsets_to_logs.sql | 154 ++++++++++++ .../postgres/CommitFileJobTest.java | 48 ++-- .../control_plane/postgres/DBUtils.java | 51 ++++ .../postgres/DeleteRecordsJobTest.java | 8 +- .../postgres/DeleteTopicJobTest.java | 2 +- .../postgres/GetDisklessLogJobTest.java | 203 ++++++++++++++++ .../postgres/InitDisklessLogJobTest.java | 229 ++++++++++++++++++ .../TopicsAndPartitionsCreateJobTest.java | 26 +- 21 files changed, 1376 insertions(+), 44 deletions(-) create mode 100644 storage/inkless/src/main/java/io/aiven/inkless/control_plane/DisklessLogAlreadyInitializedException.java create mode 100644 storage/inkless/src/main/java/io/aiven/inkless/control_plane/GetDisklessLogRequest.java create mode 100644 storage/inkless/src/main/java/io/aiven/inkless/control_plane/GetDisklessLogResponse.java create mode 100644 storage/inkless/src/main/java/io/aiven/inkless/control_plane/InitDisklessLogRequest.java create mode 100644 storage/inkless/src/main/java/io/aiven/inkless/control_plane/ProducerStateSnapshot.java create mode 100644 storage/inkless/src/main/java/io/aiven/inkless/control_plane/StaleLeaderEpochException.java create mode 100644 storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/GetDisklessLogJob.java create mode 100644 storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/InitDisklessLogJob.java create mode 100644 storage/inkless/src/main/resources/db/migration/V11__Add_diskless_offsets_to_logs.sql create mode 100644 storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/GetDisklessLogJobTest.java create mode 100644 storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/InitDisklessLogJobTest.java diff --git a/storage/inkless/src/main/java/io/aiven/inkless/control_plane/ControlPlane.java b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/ControlPlane.java index a008b02154..8d132d2a2a 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/control_plane/ControlPlane.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/ControlPlane.java @@ -55,6 +55,8 @@ List findBatches( void createTopicAndPartitions(Set requests); + void initDisklessLog(Set requests); + List deleteRecords(List requests); void deleteTopics(Set topicIds); @@ -94,6 +96,8 @@ static ControlPlane create(final InklessConfig config, final Time time) { boolean isSafeToDeleteFile(String objectKeyPath); + List getDisklessLog(List requests); + // used for testing purposes only List getLogInfo(List requests); } diff --git a/storage/inkless/src/main/java/io/aiven/inkless/control_plane/DisklessLogAlreadyInitializedException.java b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/DisklessLogAlreadyInitializedException.java new file mode 100644 index 0000000000..e242b27a4b --- /dev/null +++ b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/DisklessLogAlreadyInitializedException.java @@ -0,0 +1,44 @@ +/* + * Inkless + * Copyright (C) 2026 Aiven OY + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package io.aiven.inkless.control_plane; + +import org.apache.kafka.common.Uuid; + +/** + * Exception thrown when attempting to initialize a diskless log that has already been initialized. + */ +public class DisklessLogAlreadyInitializedException extends ControlPlaneException { + + private final Uuid topicId; + private final int partition; + + public DisklessLogAlreadyInitializedException(final Uuid topicId, final int partition) { + super(String.format("Diskless log already initialized for topic %s partition %d", + topicId, partition)); + this.topicId = topicId; + this.partition = partition; + } + + public Uuid topicId() { + return topicId; + } + + public int partition() { + return partition; + } +} diff --git a/storage/inkless/src/main/java/io/aiven/inkless/control_plane/GetDisklessLogRequest.java b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/GetDisklessLogRequest.java new file mode 100644 index 0000000000..2855735853 --- /dev/null +++ b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/GetDisklessLogRequest.java @@ -0,0 +1,24 @@ +/* + * Inkless + * Copyright (C) 2026 Aiven OY + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package io.aiven.inkless.control_plane; + +import org.apache.kafka.common.Uuid; + +public record GetDisklessLogRequest(Uuid topicId, + int partition) { +} diff --git a/storage/inkless/src/main/java/io/aiven/inkless/control_plane/GetDisklessLogResponse.java b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/GetDisklessLogResponse.java new file mode 100644 index 0000000000..a69663e44f --- /dev/null +++ b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/GetDisklessLogResponse.java @@ -0,0 +1,42 @@ +/* + * Inkless + * Copyright (C) 2026 Aiven OY + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package io.aiven.inkless.control_plane; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.protocol.Errors; + +public record GetDisklessLogResponse(Uuid topicId, + int partition, + Errors error, + long logStartOffset, + long highWatermark, + Long disklessStartOffset) { + public static final long INVALID_OFFSET = -1L; + + public static GetDisklessLogResponse success(final Uuid topicId, + final int partition, + final long logStartOffset, + final long highWatermark, + final Long disklessStartOffset) { + return new GetDisklessLogResponse(topicId, partition, Errors.NONE, logStartOffset, highWatermark, disklessStartOffset); + } + + public static GetDisklessLogResponse unknownTopicOrPartition(final Uuid topicId, final int partition) { + return new GetDisklessLogResponse(topicId, partition, Errors.UNKNOWN_TOPIC_OR_PARTITION, INVALID_OFFSET, INVALID_OFFSET, INVALID_OFFSET); + } +} diff --git a/storage/inkless/src/main/java/io/aiven/inkless/control_plane/InMemoryControlPlane.java b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/InMemoryControlPlane.java index aa99e79e84..83e7acf5b7 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/control_plane/InMemoryControlPlane.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/InMemoryControlPlane.java @@ -90,6 +90,70 @@ public synchronized void createTopicAndPartitions(final Set requests) { + for (final InitDisklessLogRequest request : requests) { + final TopicIdPartition topicIdPartition = new TopicIdPartition( + request.topicId(), request.partition(), request.topicName()); + + final LogInfo existingLogInfo = logs.get(topicIdPartition); + if (existingLogInfo != null) { + // Check if leader epoch is stale + if (request.leaderEpoch() < existingLogInfo.leaderEpochAtInit) { + throw new StaleLeaderEpochException( + request.topicId(), request.partition(), request.leaderEpoch()); + } + + // Check for invalid state: disklessStartOffset should never exceed highWatermark + if (existingLogInfo.disklessStartOffset > existingLogInfo.highWatermark) { + throw new IllegalStateException(String.format( + "Invalid state for %s: disklessStartOffset (%d) > highWatermark (%d)", + topicIdPartition, existingLogInfo.disklessStartOffset, existingLogInfo.highWatermark)); + } + + // Check if messages have been appended (no longer in migration phase) + if (existingLogInfo.disklessStartOffset < existingLogInfo.highWatermark) { + throw new DisklessLogAlreadyInitializedException( + request.topicId(), request.partition()); + } + + // Still in migration phase with valid epoch - update existing log + LOGGER.info("Updating {} with logStartOffset {}, disklessStartOffset {}, leaderEpoch {}", + topicIdPartition, request.logStartOffset(), request.disklessStartOffset(), request.leaderEpoch()); + existingLogInfo.logStartOffset = request.logStartOffset(); + existingLogInfo.highWatermark = request.disklessStartOffset(); + existingLogInfo.disklessStartOffset = request.disklessStartOffset(); + existingLogInfo.leaderEpochAtInit = request.leaderEpoch(); + + // Clear existing producer state for this partition + producers.remove(topicIdPartition); + } else { + // Create new log entry + LOGGER.info("Initializing {} with logStartOffset {}, disklessStartOffset {}, leaderEpoch {}", + topicIdPartition, request.logStartOffset(), request.disklessStartOffset(), request.leaderEpoch()); + final LogInfo logInfo = new LogInfo(); + logInfo.logStartOffset = request.logStartOffset(); + logInfo.highWatermark = request.disklessStartOffset(); + logInfo.disklessStartOffset = request.disklessStartOffset(); + logInfo.leaderEpochAtInit = request.leaderEpoch(); + logs.put(topicIdPartition, logInfo); + batches.putIfAbsent(topicIdPartition, new TreeMap<>()); + } + + // Insert producer state entries + if (request.producerStateEntries() != null && !request.producerStateEntries().isEmpty()) { + final TreeMap partitionProducers = + producers.computeIfAbsent(topicIdPartition, k -> new TreeMap<>()); + for (final ProducerStateSnapshot entry : request.producerStateEntries()) { + final LatestProducerState producerState = partitionProducers + .computeIfAbsent(entry.producerId(), k -> LatestProducerState.empty(entry.producerEpoch())); + producerState.addElement(entry.baseSequence(), entry.lastSequence(), + entry.assignedOffset(), entry.batchMaxTimestamp()); + } + } + } + } + @Override protected synchronized Iterator commitFileForValidRequests( final String objectKey, @@ -644,6 +708,27 @@ public boolean isSafeToDeleteFile(String objectKeyPath) { return !files.containsKey(objectKeyPath); } + @Override + public synchronized List getDisklessLog(final List requests) { + final List result = new ArrayList<>(); + for (final GetDisklessLogRequest request : requests) { + final TopicIdPartition tidp = findTopicIdPartition(request.topicId(), request.partition()); + final LogInfo logInfo; + if (tidp == null || (logInfo = logs.get(tidp)) == null) { + result.add(GetDisklessLogResponse.unknownTopicOrPartition(request.topicId(), request.partition())); + } else { + result.add(GetDisklessLogResponse.success( + request.topicId(), + request.partition(), + logInfo.logStartOffset, + logInfo.highWatermark, + logInfo.disklessStartOffset + )); + } + } + return result; + } + @Override public synchronized List getLogInfo(final List requests) { final List result = new ArrayList<>(); @@ -679,6 +764,8 @@ private static class LogInfo { long logStartOffset = 0; long highWatermark = 0; long byteSize = 0; + Long disklessStartOffset = null; + int leaderEpochAtInit = 0; } private static class FileInfo { diff --git a/storage/inkless/src/main/java/io/aiven/inkless/control_plane/InitDisklessLogRequest.java b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/InitDisklessLogRequest.java new file mode 100644 index 0000000000..678a0f0fd9 --- /dev/null +++ b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/InitDisklessLogRequest.java @@ -0,0 +1,109 @@ +/* + * Inkless + * Copyright (C) 2026 Aiven OY + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package io.aiven.inkless.control_plane; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.storage.internals.log.BatchMetadata; +import org.apache.kafka.storage.internals.log.ProducerStateEntry; +import org.apache.kafka.storage.internals.log.ProducerStateManager; +import org.apache.kafka.storage.internals.log.UnifiedLog; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * Request to initialize the diskless start offset for a partition. + * + * @param topicId The topic ID + * @param topicName The topic name + * @param partition The partition number + * @param logStartOffset The log start offset + * @param disklessStartOffset The diskless start offset (same as high watermark when transitioning) + * @param leaderEpoch The leader epoch of the broker making the request. Used to reject stale requests. + * @param producerStateEntries The producer state entries to store during initialization + */ +public record InitDisklessLogRequest(Uuid topicId, + String topicName, + int partition, + long logStartOffset, + long disklessStartOffset, + int leaderEpoch, + List producerStateEntries) { + + /** + * Creates an InitDisklessLogRequest from a UnifiedLog. + * @param log The UnifiedLog to extract information from + * @param leaderEpoch The current leader epoch for this partition + * @throws IllegalStateException if the log does not have a topic ID + */ + public static InitDisklessLogRequest fromUnifiedLog(final UnifiedLog log, final int leaderEpoch) { + final Uuid topicId = log.topicId() + .orElseThrow(() -> new IllegalStateException("Topic ID is required for diskless initialization")); + + final String topicName = log.topicPartition().topic(); + final int partition = log.topicPartition().partition(); + final long logStartOffset = log.logStartOffset(); + final long highWatermark = log.highWatermark(); + + // Extract producer state entries + final List producerStateEntries = extractProducerState(log.producerStateManager()); + + return new InitDisklessLogRequest( + topicId, + topicName, + partition, + logStartOffset, + highWatermark, + leaderEpoch, + producerStateEntries + ); + } + + /** + * Extracts the producer state from the ProducerStateManager. + *

+ * For each active producer, this method extracts all retained batch metadata entries + * (up to {@link ProducerStateEntry#NUM_BATCHES_TO_RETAIN} per producer) to support + * duplicate detection after the transition to diskless. + * + * @return A list of ProducerStateSnapshot entries for all active producers + */ + private static List extractProducerState(final ProducerStateManager producerStateManager) { + final Map activeProducers = producerStateManager.activeProducers(); + final List snapshots = new ArrayList<>(); + + for (final Map.Entry entry : activeProducers.entrySet()) { + final long producerId = entry.getKey(); + final ProducerStateEntry state = entry.getValue(); + + for (final BatchMetadata batch : state.batchMetadata()) { + snapshots.add(new ProducerStateSnapshot( + producerId, + state.producerEpoch(), + batch.firstSeq(), + batch.lastSeq, + batch.firstOffset(), // assigned offset is the first offset of the batch + batch.timestamp + )); + } + } + + return snapshots; + } +} diff --git a/storage/inkless/src/main/java/io/aiven/inkless/control_plane/ProducerStateSnapshot.java b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/ProducerStateSnapshot.java new file mode 100644 index 0000000000..037f914f57 --- /dev/null +++ b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/ProducerStateSnapshot.java @@ -0,0 +1,40 @@ +/* + * Inkless + * Copyright (C) 2026 Aiven OY + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package io.aiven.inkless.control_plane; + +/** + * Represents a single producer state entry to be stored in the control plane. + * This is used during the initialization of diskless log to transfer producer state + * from the local log to the control plane. + * + * @param producerId The producer ID + * @param producerEpoch The producer epoch + * @param baseSequence The base sequence number of the batch + * @param lastSequence The last sequence number of the batch + * @param assignedOffset The offset assigned to this batch + * @param batchMaxTimestamp The maximum timestamp in the batch + */ +public record ProducerStateSnapshot( + long producerId, + short producerEpoch, + int baseSequence, + int lastSequence, + long assignedOffset, + long batchMaxTimestamp +) { +} diff --git a/storage/inkless/src/main/java/io/aiven/inkless/control_plane/StaleLeaderEpochException.java b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/StaleLeaderEpochException.java new file mode 100644 index 0000000000..089d43b51c --- /dev/null +++ b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/StaleLeaderEpochException.java @@ -0,0 +1,51 @@ +/* + * Inkless + * Copyright (C) 2026 Aiven OY + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package io.aiven.inkless.control_plane; + +import org.apache.kafka.common.Uuid; + +/** + * Exception thrown when an operation is rejected because the provided leader epoch + * is stale (less than or equal to the current leader epoch stored in the control plane). + */ +public class StaleLeaderEpochException extends ControlPlaneException { + + private final Uuid topicId; + private final int partition; + private final int requestedEpoch; + + public StaleLeaderEpochException(final Uuid topicId, final int partition, final int requestedEpoch) { + super(String.format("Stale leader epoch %d for topic %s partition %d", + requestedEpoch, topicId, partition)); + this.topicId = topicId; + this.partition = partition; + this.requestedEpoch = requestedEpoch; + } + + public Uuid topicId() { + return topicId; + } + + public int partition() { + return partition; + } + + public int requestedEpoch() { + return requestedEpoch; + } +} diff --git a/storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/GetDisklessLogJob.java b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/GetDisklessLogJob.java new file mode 100644 index 0000000000..18da56824f --- /dev/null +++ b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/GetDisklessLogJob.java @@ -0,0 +1,117 @@ +/* + * Inkless + * Copyright (C) 2026 Aiven OY + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package io.aiven.inkless.control_plane.postgres; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.utils.Time; + +import org.jooq.Configuration; +import org.jooq.DSLContext; +import org.jooq.Field; +import org.jooq.Row2; +import org.jooq.impl.SQLDataType; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.function.Consumer; + +import io.aiven.inkless.control_plane.GetDisklessLogRequest; +import io.aiven.inkless.control_plane.GetDisklessLogResponse; +import io.aiven.inkless.control_plane.postgres.converters.UUIDtoUuidConverter; + +import static org.jooq.generated.Tables.LOGS; +import static org.jooq.impl.DSL.field; +import static org.jooq.impl.DSL.name; +import static org.jooq.impl.DSL.row; +import static org.jooq.impl.DSL.values; + +public class GetDisklessLogJob implements Callable> { + private static final Field REQUEST_TOPIC_ID = field(name("topic_id"), SQLDataType.UUID); + private static final Field REQUEST_PARTITION = field(name("partition"), LOGS.PARTITION.getDataType()); + + private final Time time; + private final DSLContext jooqCtx; + private final List requests; + private final Consumer durationCallback; + + public GetDisklessLogJob(final Time time, + final DSLContext jooqCtx, + final List requests, + final Consumer durationCallback) { + this.time = time; + this.jooqCtx = jooqCtx; + this.requests = requests; + this.durationCallback = durationCallback; + } + + @Override + public List call() throws Exception { + if (requests.isEmpty()) { + return List.of(); + } + return JobUtils.run(this::runOnce, time, durationCallback); + } + + private List runOnce() throws Exception { + return jooqCtx.transactionResult((final Configuration conf) -> { + final DSLContext context = conf.dsl(); + + final UUIDtoUuidConverter uuidConverter = new UUIDtoUuidConverter(); + final var requestRows = requests.stream() + .map(req -> row(uuidConverter.to(req.topicId()), req.partition())) + .toArray(Row2[]::new); + @SuppressWarnings("unchecked") + final var requestsTable = values(requestRows) + .as("requests", REQUEST_TOPIC_ID.getName(), REQUEST_PARTITION.getName()); + + final var select = context.select( + requestsTable.field(REQUEST_TOPIC_ID), + requestsTable.field(REQUEST_PARTITION), + LOGS.LOG_START_OFFSET, + LOGS.HIGH_WATERMARK, + LOGS.DISKLESS_START_OFFSET + ).from(requestsTable) + .leftJoin(LOGS).on(LOGS.TOPIC_ID.coerce(SQLDataType.UUID).eq(requestsTable.field(REQUEST_TOPIC_ID)) + .and(LOGS.PARTITION.eq(requestsTable.field(REQUEST_PARTITION)))); + + final List responses = new ArrayList<>(); + try (final var cursor = select.fetchSize(1000).fetchLazy()) { + for (final var record : cursor) { + final UUID rawTopicId = record.get(requestsTable.field(REQUEST_TOPIC_ID)); + final Uuid topicId = uuidConverter.from(rawTopicId); + final Integer partition = record.get(requestsTable.field(REQUEST_PARTITION)); + final Long logStartOffset = record.get(LOGS.LOG_START_OFFSET); + if (logStartOffset == null) { + responses.add(GetDisklessLogResponse.unknownTopicOrPartition(topicId, partition)); + } else { + responses.add(GetDisklessLogResponse.success( + topicId, + partition, + logStartOffset, + record.get(LOGS.HIGH_WATERMARK), + record.get(LOGS.DISKLESS_START_OFFSET) + )); + } + } + } + return responses; + }); + } +} diff --git a/storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/InitDisklessLogJob.java b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/InitDisklessLogJob.java new file mode 100644 index 0000000000..40b884f8fc --- /dev/null +++ b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/InitDisklessLogJob.java @@ -0,0 +1,141 @@ +/* + * Inkless + * Copyright (C) 2026 Aiven OY + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package io.aiven.inkless.control_plane.postgres; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.utils.Time; + +import org.jooq.Configuration; +import org.jooq.DSLContext; +import org.jooq.generated.udt.InitDisklessLogResponseV1; +import org.jooq.generated.udt.records.InitDisklessLogProducerStateV1Record; +import org.jooq.generated.udt.records.InitDisklessLogRequestV1Record; +import org.jooq.generated.udt.records.InitDisklessLogResponseV1Record; + +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.function.Consumer; + +import io.aiven.inkless.control_plane.DisklessLogAlreadyInitializedException; +import io.aiven.inkless.control_plane.InitDisklessLogRequest; +import io.aiven.inkless.control_plane.ProducerStateSnapshot; +import io.aiven.inkless.control_plane.StaleLeaderEpochException; + +import static org.jooq.generated.Tables.INIT_DISKLESS_LOG_V1; + +public class InitDisklessLogJob implements Callable { + + private final Time time; + private final DSLContext jooqCtx; + private final Set requests; + private final Consumer durationCallback; + + InitDisklessLogJob(final Time time, + final DSLContext jooqCtx, + final Set requests, + final Consumer durationCallback) { + this.time = time; + this.jooqCtx = jooqCtx; + this.requests = requests; + this.durationCallback = durationCallback; + } + + @Override + public Void call() { + if (requests.isEmpty()) { + return null; + } + return JobUtils.run(this::runOnce, time, durationCallback); + } + + private Void runOnce() { + final InitDisklessLogRequestV1Record[] jooqRequests = requests.stream() + .map(this::toJooqRequest) + .toArray(InitDisklessLogRequestV1Record[]::new); + + final List responses = jooqCtx.transactionResult( + (final Configuration conf) -> conf.dsl().select( + InitDisklessLogResponseV1.TOPIC_ID, + InitDisklessLogResponseV1.PARTITION, + InitDisklessLogResponseV1.ERROR + ).from(INIT_DISKLESS_LOG_V1.call(jooqRequests)) + .fetchInto(InitDisklessLogResponseV1Record.class) + ); + + for (final var response : responses) { + switch (response.getError()) { + case none -> { } + case stale_leader_epoch -> throw new StaleLeaderEpochException( + response.getTopicId(), + response.getPartition(), + findLeaderEpoch(response.getTopicId(), response.getPartition()) + ); + case invalid_state -> throw new IllegalStateException(String.format( + "Invalid state for topic %s partition %d: disklessStartOffset > highWatermark", + response.getTopicId(), response.getPartition() + )); + case already_initialized -> throw new DisklessLogAlreadyInitializedException( + response.getTopicId(), + response.getPartition() + ); + } + } + return null; + } + + private InitDisklessLogRequestV1Record toJooqRequest(final InitDisklessLogRequest request) { + final InitDisklessLogProducerStateV1Record[] producerState; + if (request.producerStateEntries() == null || request.producerStateEntries().isEmpty()) { + producerState = new InitDisklessLogProducerStateV1Record[0]; + } else { + producerState = request.producerStateEntries().stream() + .map(this::toJooqProducerState) + .toArray(InitDisklessLogProducerStateV1Record[]::new); + } + + return new InitDisklessLogRequestV1Record( + request.topicId(), + request.partition(), + request.topicName(), + request.logStartOffset(), + request.disklessStartOffset(), + request.leaderEpoch(), + producerState + ); + } + + private InitDisklessLogProducerStateV1Record toJooqProducerState(final ProducerStateSnapshot entry) { + return new InitDisklessLogProducerStateV1Record( + entry.producerId(), + entry.producerEpoch(), + entry.baseSequence(), + entry.lastSequence(), + entry.assignedOffset(), + entry.batchMaxTimestamp() + ); + } + + private int findLeaderEpoch(final Uuid topicId, final int partition) { + return requests.stream() + .filter(r -> r.topicId().equals(topicId) && r.partition() == partition) + .findFirst() + .map(InitDisklessLogRequest::leaderEpoch) + .orElse(-1); + } +} diff --git a/storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/PostgresControlPlane.java b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/PostgresControlPlane.java index e76c869315..9d641236f4 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/PostgresControlPlane.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/PostgresControlPlane.java @@ -57,8 +57,11 @@ import io.aiven.inkless.control_plane.FileToDelete; import io.aiven.inkless.control_plane.FindBatchRequest; import io.aiven.inkless.control_plane.FindBatchResponse; +import io.aiven.inkless.control_plane.GetDisklessLogRequest; +import io.aiven.inkless.control_plane.GetDisklessLogResponse; import io.aiven.inkless.control_plane.GetLogInfoRequest; import io.aiven.inkless.control_plane.GetLogInfoResponse; +import io.aiven.inkless.control_plane.InitDisklessLogRequest; import io.aiven.inkless.control_plane.ListOffsetsRequest; import io.aiven.inkless.control_plane.ListOffsetsResponse; import io.aiven.inkless.control_plane.MergedFileBatch; @@ -149,6 +152,12 @@ public void createTopicAndPartitions(final Set new TopicsAndPartitionsCreateJob(time, jobsJooqCtx, requests, pgMetrics::onTopicCreateCompleted).run(); } + @Override + public void initDisklessLog(final Set requests) { + // Expected to be performed synchronously + new InitDisklessLogJob(time, jobsJooqCtx, requests, pgMetrics::onInitDisklessLogCompleted).call(); + } + @Override protected Iterator commitFileForValidRequests( final String objectKey, @@ -333,6 +342,20 @@ public boolean isSafeToDeleteFile(String objectKeyPath) { } } + @Override + public List getDisklessLog(final List requests) { + try { + final GetDisklessLogJob job = new GetDisklessLogJob(time, readJooqCtx, requests, pgMetrics::onGetDisklessLogCompleted); + return job.call(); + } catch (final Exception e) { + if (e instanceof ControlPlaneException) { + throw (ControlPlaneException) e; + } else { + throw new ControlPlaneException("Failed to get diskless log", e); + } + } + } + @Override public List getLogInfo(final List requests) { try { diff --git a/storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/PostgresControlPlaneMetrics.java b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/PostgresControlPlaneMetrics.java index 3f8d55738e..89e1f76d8f 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/PostgresControlPlaneMetrics.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/PostgresControlPlaneMetrics.java @@ -36,6 +36,7 @@ public class PostgresControlPlaneMetrics implements Closeable { private final QueryMetrics commitFileMetrics = new QueryMetrics("CommitFile"); private final QueryMetrics commitFileMergeWorkItemMetrics = new QueryMetrics("CommitFileMergeWorkItem"); private final QueryMetrics topicCreateMetrics = new QueryMetrics("TopicCreate"); + private final QueryMetrics initDisklessLogMetrics = new QueryMetrics("InitDisklessLog"); private final QueryMetrics topicDeleteMetrics = new QueryMetrics("TopicDelete"); private final QueryMetrics fileDeleteMetrics = new QueryMetrics("FilesDelete"); private final QueryMetrics listOffsetsMetrics = new QueryMetrics("ListOffsets"); @@ -46,6 +47,7 @@ public class PostgresControlPlaneMetrics implements Closeable { private final QueryMetrics releaseFileMergeWorkItemMetrics = new QueryMetrics("ReleaseFileMergeWorkItem"); private final QueryMetrics safeDeleteFileCheckMetrics = new QueryMetrics("SafeDeleteFileCheck"); private final QueryMetrics getLogInfoMetrics = new QueryMetrics("GetLogInfo"); + private final QueryMetrics getDisklessLogMetrics = new QueryMetrics("GetDisklessLog"); public PostgresControlPlaneMetrics(Time time) { this.time = Objects.requireNonNull(time, "time cannot be null"); @@ -75,6 +77,10 @@ public void onTopicCreateCompleted(Long duration) { topicCreateMetrics.record(duration); } + public void onInitDisklessLogCompleted(Long duration) { + initDisklessLogMetrics.record(duration); + } + public void onFilesDeleteCompleted(Long duration) { fileDeleteMetrics.record(duration); } @@ -111,6 +117,10 @@ public void onGetLogInfoCompleted(Long duration) { getLogInfoMetrics.record(duration); } + public void onGetDisklessLogCompleted(Long duration) { + getDisklessLogMetrics.record(duration); + } + @Override public void close() { findBatchesMetrics.remove(); @@ -118,6 +128,7 @@ public void close() { commitFileMetrics.remove(); commitFileMergeWorkItemMetrics.remove(); topicCreateMetrics.remove(); + initDisklessLogMetrics.remove(); topicDeleteMetrics.remove(); fileDeleteMetrics.remove(); listOffsetsMetrics.remove(); @@ -128,6 +139,7 @@ public void close() { releaseFileMergeWorkItemMetrics.remove(); safeDeleteFileCheckMetrics.remove(); getLogInfoMetrics.remove(); + getDisklessLogMetrics.remove(); } private class QueryMetrics { diff --git a/storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/TopicsAndPartitionsCreateJob.java b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/TopicsAndPartitionsCreateJob.java index 6e77d0aad4..c699f04dc8 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/TopicsAndPartitionsCreateJob.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/TopicsAndPartitionsCreateJob.java @@ -69,10 +69,11 @@ private void runOnce() { LOGS.TOPIC_NAME, LOGS.LOG_START_OFFSET, LOGS.HIGH_WATERMARK, - LOGS.BYTE_SIZE); + LOGS.BYTE_SIZE, + LOGS.DISKLESS_START_OFFSET); for (final var request : requests) { for (int partition = 0; partition < request.numPartitions(); partition++) { - insertStep = insertStep.values(request.topicId(), partition, request.topicName(), 0L, 0L, 0L); + insertStep = insertStep.values(request.topicId(), partition, request.topicName(), 0L, 0L, 0L, 0L); } } final int rowsInserted = insertStep.onConflictDoNothing().execute(); diff --git a/storage/inkless/src/main/resources/db/migration/V11__Add_diskless_offsets_to_logs.sql b/storage/inkless/src/main/resources/db/migration/V11__Add_diskless_offsets_to_logs.sql new file mode 100644 index 0000000000..ee69f0b438 --- /dev/null +++ b/storage/inkless/src/main/resources/db/migration/V11__Add_diskless_offsets_to_logs.sql @@ -0,0 +1,154 @@ +-- Copyright (c) 2026 Aiven, Helsinki, Finland. https://aiven.io/ +CREATE DOMAIN leader_epoch_t AS INT NOT NULL +CHECK (VALUE >= 0); + +ALTER TABLE logs ADD COLUMN diskless_start_offset offset_t DEFAULT 0; +ALTER TABLE logs ADD COLUMN diskless_end_offset offset_nullable_t DEFAULT NULL; +ALTER TABLE logs ADD COLUMN leader_epoch_at_init leader_epoch_t DEFAULT 0; + +CREATE TYPE init_diskless_log_producer_state_v1 AS ( + producer_id producer_id_t, + producer_epoch producer_epoch_t, + base_sequence sequence_t, + last_sequence sequence_t, + assigned_offset offset_t, + batch_max_timestamp timestamp_t +); + +CREATE TYPE init_diskless_log_request_v1 AS ( + topic_id topic_id_t, + partition partition_t, + topic_name topic_name_t, + log_start_offset offset_t, + diskless_start_offset offset_t, + leader_epoch leader_epoch_t, + producer_state init_diskless_log_producer_state_v1[] +); + +CREATE TYPE init_diskless_log_response_error_v1 AS ENUM ( + 'none', + 'stale_leader_epoch', + 'already_initialized', + 'invalid_state' +); + +CREATE TYPE init_diskless_log_response_v1 AS ( + topic_id topic_id_t, + partition partition_t, + error init_diskless_log_response_error_v1 +); + +-- Init diskless log function: +-- - Rejects with stale_leader_epoch if leader_epoch < leader_epoch_at_init +-- - Rejects with invalid_state if diskless_start_offset > high_watermark (corrupted state) +-- - Rejects with already_initialized if messages have been appended (diskless_start_offset < high_watermark) +CREATE FUNCTION init_diskless_log_v1( + arg_requests init_diskless_log_request_v1[] +) +RETURNS SETOF init_diskless_log_response_v1 LANGUAGE plpgsql VOLATILE AS $$ +DECLARE + l_request RECORD; + l_existing_log RECORD; + l_producer_state RECORD; +BEGIN + FOR l_request IN + SELECT * + FROM unnest(arg_requests) + LOOP + -- Check if log already exists + SELECT topic_id, partition, leader_epoch_at_init, diskless_start_offset, high_watermark + INTO l_existing_log + FROM logs + WHERE topic_id = l_request.topic_id + AND partition = l_request.partition; + + IF FOUND THEN + -- Check if leader epoch is stale + IF l_request.leader_epoch < l_existing_log.leader_epoch_at_init THEN + RETURN NEXT (l_request.topic_id, l_request.partition, 'stale_leader_epoch')::init_diskless_log_response_v1; + CONTINUE; + END IF; + + -- Check for invalid state: diskless_start_offset should never exceed high_watermark + IF l_existing_log.diskless_start_offset > l_existing_log.high_watermark THEN + RETURN NEXT (l_request.topic_id, l_request.partition, 'invalid_state')::init_diskless_log_response_v1; + CONTINUE; + END IF; + + -- Check if messages have been appended (no longer in migration phase) + IF l_existing_log.diskless_start_offset < l_existing_log.high_watermark THEN + RETURN NEXT (l_request.topic_id, l_request.partition, 'already_initialized')::init_diskless_log_response_v1; + CONTINUE; + END IF; + + -- Still in migration phase with valid epoch - update existing log + UPDATE logs + SET log_start_offset = l_request.log_start_offset, + high_watermark = l_request.diskless_start_offset, + diskless_start_offset = l_request.diskless_start_offset, + leader_epoch_at_init = l_request.leader_epoch + WHERE topic_id = l_request.topic_id + AND partition = l_request.partition; + + -- Delete existing producer state for this partition + DELETE FROM producer_state + WHERE topic_id = l_request.topic_id + AND partition = l_request.partition; + ELSE + -- Insert new log record + INSERT INTO logs ( + topic_id, + partition, + topic_name, + log_start_offset, + high_watermark, + byte_size, + diskless_start_offset, + leader_epoch_at_init + ) + VALUES ( + l_request.topic_id, + l_request.partition, + l_request.topic_name, + l_request.log_start_offset, + l_request.diskless_start_offset, + 0, + l_request.diskless_start_offset, + l_request.leader_epoch + ); + END IF; + + -- Insert producer state entries + IF l_request.producer_state IS NOT NULL THEN + FOR l_producer_state IN + SELECT * + FROM unnest(l_request.producer_state) + LOOP + INSERT INTO producer_state ( + topic_id, + partition, + producer_id, + producer_epoch, + base_sequence, + last_sequence, + assigned_offset, + batch_max_timestamp + ) + VALUES ( + l_request.topic_id, + l_request.partition, + l_producer_state.producer_id, + l_producer_state.producer_epoch, + l_producer_state.base_sequence, + l_producer_state.last_sequence, + l_producer_state.assigned_offset, + l_producer_state.batch_max_timestamp + ); + END LOOP; + END IF; + + RETURN NEXT (l_request.topic_id, l_request.partition, 'none')::init_diskless_log_response_v1; + END LOOP; +END; +$$ +; \ No newline at end of file diff --git a/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/CommitFileJobTest.java b/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/CommitFileJobTest.java index 77daba9322..a255026231 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/CommitFileJobTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/CommitFileJobTest.java @@ -112,9 +112,9 @@ void simpleCommit() { assertThat(DBUtils.getAllLogs(pgContainer.getDataSource())) .containsExactlyInAnyOrder( - new LogsRecord(TOPIC_ID_0, 0, TOPIC_0, 0L, 0L, 0L), - new LogsRecord(TOPIC_ID_0, 1, TOPIC_0, 0L, 15L, 100L), - new LogsRecord(TOPIC_ID_1, 0, TOPIC_1, 0L, 27L, 50L) + new LogsRecord(TOPIC_ID_0, 0, TOPIC_0, 0L, 0L, 0L, 0L, null, 0), + new LogsRecord(TOPIC_ID_0, 1, TOPIC_0, 0L, 15L, 100L, 0L, null, 0), + new LogsRecord(TOPIC_ID_1, 0, TOPIC_1, 0L, 27L, 50L, 0L, null, 0) ); assertThat(DBUtils.getAllFiles(pgContainer.getDataSource())) @@ -168,9 +168,9 @@ void commitMultipleFiles() { assertThat(DBUtils.getAllLogs(pgContainer.getDataSource())) .containsExactlyInAnyOrder( - new LogsRecord(TOPIC_ID_0, 0, TOPIC_0, 0L, 159L, 111L), - new LogsRecord(TOPIC_ID_0, 1, TOPIC_0, 0L, 15L + 245, 322L), - new LogsRecord(TOPIC_ID_1, 0, TOPIC_1, 0L, 27L, 50L) + new LogsRecord(TOPIC_ID_0, 0, TOPIC_0, 0L, 159L, 111L, 0L, null, 0), + new LogsRecord(TOPIC_ID_0, 1, TOPIC_0, 0L, 15L + 245, 322L, 0L, null, 0), + new LogsRecord(TOPIC_ID_1, 0, TOPIC_1, 0L, 27L, 50L, 0L, null, 0) ); assertThat(DBUtils.getAllFiles(pgContainer.getDataSource())) @@ -215,9 +215,9 @@ void nonExistentPartition() { assertThat(DBUtils.getAllLogs(pgContainer.getDataSource())) .containsExactlyInAnyOrder( - new LogsRecord(TOPIC_ID_0, 0, TOPIC_0, 0L, 0L, 0L), - new LogsRecord(TOPIC_ID_0, 1, TOPIC_0, 0L, 15L, 100L), - new LogsRecord(TOPIC_ID_1, 0, TOPIC_1, 0L, 27L, 50L) + new LogsRecord(TOPIC_ID_0, 0, TOPIC_0, 0L, 0L, 0L, 0L, null, 0), + new LogsRecord(TOPIC_ID_0, 1, TOPIC_0, 0L, 15L, 100L, 0L, null, 0), + new LogsRecord(TOPIC_ID_1, 0, TOPIC_1, 0L, 27L, 50L, 0L, null, 0) ); assertThat(DBUtils.getAllFiles(pgContainer.getDataSource())) @@ -249,9 +249,9 @@ void simpleIdempotentCommit() { assertThat(DBUtils.getAllLogs(pgContainer.getDataSource())) .containsExactlyInAnyOrder( - new LogsRecord(TOPIC_ID_0, 0, TOPIC_0, 0L, 0L, 0L), - new LogsRecord(TOPIC_ID_0, 1, TOPIC_0, 0L, 15L, 100L), - new LogsRecord(TOPIC_ID_1, 0, TOPIC_1, 0L, 27L, 50L) + new LogsRecord(TOPIC_ID_0, 0, TOPIC_0, 0L, 0L, 0L, 0L, null, 0), + new LogsRecord(TOPIC_ID_0, 1, TOPIC_0, 0L, 15L, 100L, 0L, null, 0), + new LogsRecord(TOPIC_ID_1, 0, TOPIC_1, 0L, 27L, 50L, 0L, null, 0) ); assertThat(DBUtils.getAllFiles(pgContainer.getDataSource())) @@ -283,9 +283,9 @@ void inSequenceCommit() { assertThat(DBUtils.getAllLogs(pgContainer.getDataSource())) .containsExactlyInAnyOrder( - new LogsRecord(TOPIC_ID_0, 0, TOPIC_0, 0L, 0L, 0L), - new LogsRecord(TOPIC_ID_0, 1, TOPIC_0, 0L, 27L, 150L), - new LogsRecord(TOPIC_ID_1, 0, TOPIC_1, 0L, 0L, 0L) + new LogsRecord(TOPIC_ID_0, 0, TOPIC_0, 0L, 0L, 0L, 0L, null, 0), + new LogsRecord(TOPIC_ID_0, 1, TOPIC_0, 0L, 27L, 150L, 0L, null, 0), + new LogsRecord(TOPIC_ID_1, 0, TOPIC_1, 0L, 0L, 0L, 0L, null, 0) ); assertThat(DBUtils.getAllFiles(pgContainer.getDataSource())) @@ -325,9 +325,9 @@ void outOfOrderCommit(int lastBatchSequence, int firstBatchSequence) { assertThat(DBUtils.getAllLogs(pgContainer.getDataSource())) .containsExactlyInAnyOrder( - new LogsRecord(TOPIC_ID_0, 0, TOPIC_0, 0L, 0L, 0L), - new LogsRecord(TOPIC_ID_0, 1, TOPIC_0, 0L, 15L, 100L), - new LogsRecord(TOPIC_ID_1, 0, TOPIC_1, 0L, 0L, 0L) + new LogsRecord(TOPIC_ID_0, 0, TOPIC_0, 0L, 0L, 0L, 0L, null, 0), + new LogsRecord(TOPIC_ID_0, 1, TOPIC_0, 0L, 15L, 100L, 0L, null, 0), + new LogsRecord(TOPIC_ID_1, 0, TOPIC_1, 0L, 0L, 0L, 0L, null, 0) ); assertThat(DBUtils.getAllFiles(pgContainer.getDataSource())) @@ -357,9 +357,9 @@ void outOfOrderCommitNewEpoch() { assertThat(DBUtils.getAllLogs(pgContainer.getDataSource())) .containsExactlyInAnyOrder( - new LogsRecord(TOPIC_ID_0, 0, TOPIC_0, 0L, 0L, 0L), - new LogsRecord(TOPIC_ID_0, 1, TOPIC_0, 0L, 0L, 0L), - new LogsRecord(TOPIC_ID_1, 0, TOPIC_1, 0L, 0L, 0L) + new LogsRecord(TOPIC_ID_0, 0, TOPIC_0, 0L, 0L, 0L, 0L, null, 0), + new LogsRecord(TOPIC_ID_0, 1, TOPIC_0, 0L, 0L, 0L, 0L, null, 0), + new LogsRecord(TOPIC_ID_1, 0, TOPIC_1, 0L, 0L, 0L, 0L, null, 0) ); // The file will be deleted because its only batch is rejected. @@ -389,9 +389,9 @@ void invalidProducerEpoch() { assertThat(DBUtils.getAllLogs(pgContainer.getDataSource())) .containsExactlyInAnyOrder( - new LogsRecord(TOPIC_ID_0, 0, TOPIC_0, 0L, 0L, 0L), - new LogsRecord(TOPIC_ID_0, 1, TOPIC_0, 0L, 15L, 100L), - new LogsRecord(TOPIC_ID_1, 0, TOPIC_1, 0L, 0L, 0L) + new LogsRecord(TOPIC_ID_0, 0, TOPIC_0, 0L, 0L, 0L, 0L, null, 0), + new LogsRecord(TOPIC_ID_0, 1, TOPIC_0, 0L, 15L, 100L, 0L, null, 0), + new LogsRecord(TOPIC_ID_1, 0, TOPIC_1, 0L, 0L, 0L, 0L, null, 0) ); assertThat(DBUtils.getAllFiles(pgContainer.getDataSource())) diff --git a/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/DBUtils.java b/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/DBUtils.java index 4b532737b5..770cbd7f6c 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/DBUtils.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/DBUtils.java @@ -18,6 +18,8 @@ package io.aiven.inkless.control_plane.postgres; +import org.apache.kafka.common.Uuid; + import com.zaxxer.hikari.HikariDataSource; import org.jooq.DSLContext; @@ -26,17 +28,20 @@ import org.jooq.generated.tables.records.BatchesRecord; import org.jooq.generated.tables.records.FilesRecord; import org.jooq.generated.tables.records.LogsRecord; +import org.jooq.generated.tables.records.ProducerStateRecord; import org.jooq.impl.DSL; import org.jooq.impl.TableImpl; import java.sql.Connection; import java.sql.SQLException; +import java.util.List; import java.util.Set; import java.util.stream.Collectors; import static org.jooq.generated.Tables.BATCHES; import static org.jooq.generated.Tables.FILES; import static org.jooq.generated.Tables.LOGS; +import static org.jooq.generated.Tables.PRODUCER_STATE; import static org.jooq.impl.DSL.asterisk; public class DBUtils { @@ -52,6 +57,52 @@ static Set getAllBatches(final HikariDataSource hikariDataSource) return getAll(hikariDataSource, BATCHES, BatchesRecord.class); } + static List getAllProducerState(final HikariDataSource hikariDataSource) { + try (final Connection connection = hikariDataSource.getConnection()) { + final DSLContext ctx = DSL.using(connection, SQLDialect.POSTGRES); + return ctx.select(asterisk()) + .from(PRODUCER_STATE) + .orderBy(PRODUCER_STATE.TOPIC_ID, PRODUCER_STATE.PARTITION, PRODUCER_STATE.PRODUCER_ID, PRODUCER_STATE.ROW_ID) + .fetchInto(ProducerStateRecord.class); + } catch (final SQLException e) { + throw new RuntimeException(e); + } + } + + static Integer getLeaderEpochAtInit(final HikariDataSource hikariDataSource, final Uuid topicId, final int partition) { + try (final Connection connection = hikariDataSource.getConnection()) { + final DSLContext ctx = DSL.using(connection, SQLDialect.POSTGRES); + return ctx.select(LOGS.LEADER_EPOCH_AT_INIT) + .from(LOGS) + .where(LOGS.TOPIC_ID.eq(topicId) + .and(LOGS.PARTITION.eq(partition))) + .fetchOne(LOGS.LEADER_EPOCH_AT_INIT); + } catch (final SQLException e) { + throw new RuntimeException(e); + } + } + + /** + * Simulates messages being appended by updating the high_watermark. + * This makes diskless_start_offset < high_watermark, indicating the log is no longer in migration phase. + */ + static void simulateMessagesAppended(final HikariDataSource hikariDataSource, final Uuid topicId, final int partition, final long newHighWatermark) { + try (final Connection connection = hikariDataSource.getConnection()) { + final DSLContext ctx = DSL.using(connection, SQLDialect.POSTGRES); + final int rowsUpdated = ctx.update(LOGS) + .set(LOGS.HIGH_WATERMARK, newHighWatermark) + .where(LOGS.TOPIC_ID.eq(topicId) + .and(LOGS.PARTITION.eq(partition))) + .execute(); + if (rowsUpdated == 0) { + throw new RuntimeException("No rows updated - log entry not found for topic " + topicId + " partition " + partition); + } + connection.commit(); + } catch (final SQLException e) { + throw new RuntimeException(e); + } + } + private static Set getAll(final HikariDataSource hikariDataSource, final TableImpl table, final Class recordClass) { diff --git a/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/DeleteRecordsJobTest.java b/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/DeleteRecordsJobTest.java index b6eb541544..2b960fe1d4 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/DeleteRecordsJobTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/DeleteRecordsJobTest.java @@ -185,10 +185,10 @@ void deleteRecordsFromMultipleTopics(final List order) { assertThat(responses).containsExactlyElementsOf(expectedResponses); assertThat(DBUtils.getAllLogs(pgContainer.getDataSource())).containsExactlyInAnyOrder( - new LogsRecord(TOPIC_ID_0, 0, TOPIC_0, 18L, 36L, (long) file2Batch1Size + file3Batch1Size), - new LogsRecord(TOPIC_ID_0, 1, TOPIC_0, 24L, 24L, 0L), - new LogsRecord(TOPIC_ID_1, 0, TOPIC_1, 0L, 0L, 0L), - new LogsRecord(TOPIC_ID_2, 0, TOPIC_2, 0L, 24L, (long) file2Batch2Size + file3Batch3Size) + new LogsRecord(TOPIC_ID_0, 0, TOPIC_0, 18L, 36L, (long) file2Batch1Size + file3Batch1Size, 0L, null, 0), + new LogsRecord(TOPIC_ID_0, 1, TOPIC_0, 24L, 24L, 0L, 0L, null, 0), + new LogsRecord(TOPIC_ID_1, 0, TOPIC_1, 0L, 0L, 0L, 0L, null, 0), + new LogsRecord(TOPIC_ID_2, 0, TOPIC_2, 0L, 24L, (long) file2Batch2Size + file3Batch3Size, 0L, null, 0) ); assertThat(DBUtils.getAllBatches(pgContainer.getDataSource())).containsExactlyInAnyOrder( diff --git a/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/DeleteTopicJobTest.java b/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/DeleteTopicJobTest.java index 522d8d6592..355ddfbf55 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/DeleteTopicJobTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/DeleteTopicJobTest.java @@ -149,7 +149,7 @@ void deleteMultipleTopics() { // The logs of the deleted topics must be gone, i.e. only TOPIC_2 remains. assertThat(DBUtils.getAllLogs(pgContainer.getDataSource())).containsExactly( - new LogsRecord(TOPIC_ID_2, 0, TOPIC_2, 0L, 24L, (long) file2Batch2Size + file3Batch3Size) + new LogsRecord(TOPIC_ID_2, 0, TOPIC_2, 0L, 24L, (long) file2Batch2Size + file3Batch3Size, 0L, null, 0) ); // The batches of the deleted topics must be gone, i.e. only TOPIC_2 remains. diff --git a/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/GetDisklessLogJobTest.java b/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/GetDisklessLogJobTest.java new file mode 100644 index 0000000000..941a5586b3 --- /dev/null +++ b/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/GetDisklessLogJobTest.java @@ -0,0 +1,203 @@ +/* + * Inkless + * Copyright (C) 2026 Aiven OY + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package io.aiven.inkless.control_plane.postgres; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.utils.Time; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.util.List; +import java.util.Set; + +import io.aiven.inkless.control_plane.GetDisklessLogRequest; +import io.aiven.inkless.control_plane.GetDisklessLogResponse; +import io.aiven.inkless.control_plane.InitDisklessLogRequest; +import io.aiven.inkless.test_utils.InklessPostgreSQLContainer; +import io.aiven.inkless.test_utils.PostgreSQLTestContainer; + +import static org.assertj.core.api.Assertions.assertThat; + +@Testcontainers +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.STRICT_STUBS) +class GetDisklessLogJobTest { + @Container + static final InklessPostgreSQLContainer pgContainer = PostgreSQLTestContainer.container(); + + static final String TOPIC_1 = "topic1"; + static final Uuid TOPIC_ID1 = new Uuid(10, 12); + static final String TOPIC_2 = "topic2"; + static final Uuid TOPIC_ID2 = new Uuid(20, 24); + + @BeforeEach + void setUp(final TestInfo testInfo) { + pgContainer.createDatabase(testInfo); + pgContainer.migrate(); + } + + @AfterEach + void tearDown() { + pgContainer.tearDown(); + } + + @Test + void emptyRequestList() throws Exception { + final GetDisklessLogJob job = new GetDisklessLogJob( + Time.SYSTEM, pgContainer.getJooqCtx(), List.of(), durationMs -> {}); + final List responses = job.call(); + assertThat(responses).isEmpty(); + } + + @Test + void unknownTopicOrPartition() throws Exception { + // Query for a partition that doesn't exist + final List requests = List.of( + new GetDisklessLogRequest(TOPIC_ID1, 0) + ); + final GetDisklessLogJob job = new GetDisklessLogJob( + Time.SYSTEM, pgContainer.getJooqCtx(), requests, durationMs -> {}); + final List responses = job.call(); + + assertThat(responses).hasSize(1); + final GetDisklessLogResponse response = responses.get(0); + assertThat(response.topicId()).isEqualTo(TOPIC_ID1); + assertThat(response.partition()).isEqualTo(0); + assertThat(response.error()).isEqualTo(Errors.UNKNOWN_TOPIC_OR_PARTITION); + assertThat(response.logStartOffset()).isEqualTo(GetDisklessLogResponse.INVALID_OFFSET); + assertThat(response.highWatermark()).isEqualTo(GetDisklessLogResponse.INVALID_OFFSET); + assertThat(response.disklessStartOffset()).isEqualTo(GetDisklessLogResponse.INVALID_OFFSET); + } + + @Test + void getExistingLog() throws Exception { + // First, create a log entry + final Set initRequests = Set.of( + new InitDisklessLogRequest(TOPIC_ID1, TOPIC_1, 0, 100L, 200L, 5, List.of()) + ); + new InitDisklessLogJob(Time.SYSTEM, pgContainer.getJooqCtx(), initRequests, durationMs -> {}).call(); + + // Now query for it + final List requests = List.of( + new GetDisklessLogRequest(TOPIC_ID1, 0) + ); + final GetDisklessLogJob job = new GetDisklessLogJob( + Time.SYSTEM, pgContainer.getJooqCtx(), requests, durationMs -> {}); + final List responses = job.call(); + + assertThat(responses).hasSize(1); + final GetDisklessLogResponse response = responses.get(0); + assertThat(response.topicId()).isEqualTo(TOPIC_ID1); + assertThat(response.partition()).isEqualTo(0); + assertThat(response.error()).isEqualTo(Errors.NONE); + assertThat(response.logStartOffset()).isEqualTo(100L); + assertThat(response.highWatermark()).isEqualTo(200L); + // disklessStartOffset is set to highWatermark when initialized + assertThat(response.disklessStartOffset()).isEqualTo(200L); + } + + @Test + void getMultiplePartitions() throws Exception { + // Create multiple log entries + final Set initRequests = Set.of( + new InitDisklessLogRequest(TOPIC_ID1, TOPIC_1, 0, 100L, 200L, 5, List.of()), + new InitDisklessLogRequest(TOPIC_ID1, TOPIC_1, 1, 50L, 150L, 3, List.of()), + new InitDisklessLogRequest(TOPIC_ID2, TOPIC_2, 0, 0L, 500L, 1, List.of()) + ); + new InitDisklessLogJob(Time.SYSTEM, pgContainer.getJooqCtx(), initRequests, durationMs -> {}).call(); + + // Query for all of them + final List requests = List.of( + new GetDisklessLogRequest(TOPIC_ID1, 0), + new GetDisklessLogRequest(TOPIC_ID1, 1), + new GetDisklessLogRequest(TOPIC_ID2, 0) + ); + final GetDisklessLogJob job = new GetDisklessLogJob( + Time.SYSTEM, pgContainer.getJooqCtx(), requests, durationMs -> {}); + final List responses = job.call(); + + assertThat(responses).hasSize(3); + + // Check topic1 partition 0 + final GetDisklessLogResponse topic1Part0 = responses.stream() + .filter(r -> r.topicId().equals(TOPIC_ID1) && r.partition() == 0) + .findFirst() + .orElseThrow(); + assertThat(topic1Part0.error()).isEqualTo(Errors.NONE); + assertThat(topic1Part0.logStartOffset()).isEqualTo(100L); + assertThat(topic1Part0.highWatermark()).isEqualTo(200L); + assertThat(topic1Part0.disklessStartOffset()).isEqualTo(200L); + + // Check topic1 partition 1 + final GetDisklessLogResponse topic1Part1 = responses.stream() + .filter(r -> r.topicId().equals(TOPIC_ID1) && r.partition() == 1) + .findFirst() + .orElseThrow(); + assertThat(topic1Part1.error()).isEqualTo(Errors.NONE); + assertThat(topic1Part1.logStartOffset()).isEqualTo(50L); + assertThat(topic1Part1.highWatermark()).isEqualTo(150L); + assertThat(topic1Part1.disklessStartOffset()).isEqualTo(150L); + + // Check topic2 partition 0 + final GetDisklessLogResponse topic2Part0 = responses.stream() + .filter(r -> r.topicId().equals(TOPIC_ID2) && r.partition() == 0) + .findFirst() + .orElseThrow(); + assertThat(topic2Part0.error()).isEqualTo(Errors.NONE); + assertThat(topic2Part0.logStartOffset()).isEqualTo(0L); + assertThat(topic2Part0.highWatermark()).isEqualTo(500L); + assertThat(topic2Part0.disklessStartOffset()).isEqualTo(500L); + } + + @Test + void duplicateRequestsReturnDuplicateResponses() throws Exception { + // Create a log entry + final Set initRequests = Set.of( + new InitDisklessLogRequest(TOPIC_ID1, TOPIC_1, 0, 100L, 200L, 5, List.of()) + ); + new InitDisklessLogJob(Time.SYSTEM, pgContainer.getJooqCtx(), initRequests, durationMs -> {}).call(); + + // Query for the same partition twice + final List requests = List.of( + new GetDisklessLogRequest(TOPIC_ID1, 0), + new GetDisklessLogRequest(TOPIC_ID1, 0) + ); + final GetDisklessLogJob job = new GetDisklessLogJob( + Time.SYSTEM, pgContainer.getJooqCtx(), requests, durationMs -> {}); + final List responses = job.call(); + + assertThat(responses).hasSize(2); + for (final GetDisklessLogResponse response : responses) { + assertThat(response.topicId()).isEqualTo(TOPIC_ID1); + assertThat(response.partition()).isEqualTo(0); + assertThat(response.error()).isEqualTo(Errors.NONE); + assertThat(response.logStartOffset()).isEqualTo(100L); + assertThat(response.highWatermark()).isEqualTo(200L); + } + } +} diff --git a/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/InitDisklessLogJobTest.java b/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/InitDisklessLogJobTest.java new file mode 100644 index 0000000000..e4b4eb378c --- /dev/null +++ b/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/InitDisklessLogJobTest.java @@ -0,0 +1,229 @@ +/* + * Inkless + * Copyright (C) 2024 - 2025 Aiven OY + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package io.aiven.inkless.control_plane.postgres; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.utils.Time; + +import org.jooq.generated.tables.records.ProducerStateRecord; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.util.List; +import java.util.Set; + +import io.aiven.inkless.control_plane.DisklessLogAlreadyInitializedException; +import io.aiven.inkless.control_plane.InitDisklessLogRequest; +import io.aiven.inkless.control_plane.ProducerStateSnapshot; +import io.aiven.inkless.control_plane.StaleLeaderEpochException; +import io.aiven.inkless.test_utils.InklessPostgreSQLContainer; +import io.aiven.inkless.test_utils.PostgreSQLTestContainer; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +@Testcontainers +class InitDisklessLogJobTest { + @Container + static final InklessPostgreSQLContainer pgContainer = PostgreSQLTestContainer.container(); + + static final String TOPIC_1 = "topic1"; + static final Uuid TOPIC_ID1 = new Uuid(10, 12); + + @BeforeEach + void setUp(final TestInfo testInfo) { + pgContainer.createDatabase(testInfo); + pgContainer.migrate(); + } + + @AfterEach + void tearDown() { + pgContainer.tearDown(); + } + + private void runInitJob(final Set requests) { + new InitDisklessLogJob(Time.SYSTEM, pgContainer.getJooqCtx(), requests, durationMs -> {}).call(); + } + + @Test + void initializeNewLog() { + final List producerStateEntries = List.of( + new ProducerStateSnapshot(1001L, (short) 1, 0, 9, 100L, 1000L), + new ProducerStateSnapshot(1002L, (short) 2, 0, 4, 110L, 2000L) + ); + runInitJob(Set.of( + new InitDisklessLogRequest(TOPIC_ID1, TOPIC_1, 0, 100L, 200L, 5, producerStateEntries) + )); + + // Verify log was created + final var logs = DBUtils.getAllLogs(pgContainer.getDataSource()); + assertThat(logs).hasSize(1); + final var log = logs.iterator().next(); + assertThat(log.getTopicId()).isEqualTo(TOPIC_ID1); + assertThat(log.getPartition()).isEqualTo(0); + assertThat(log.getTopicName()).isEqualTo(TOPIC_1); + assertThat(log.getLogStartOffset()).isEqualTo(100L); + assertThat(log.getHighWatermark()).isEqualTo(200L); + assertThat(log.getDisklessStartOffset()).isEqualTo(200L); + assertThat(DBUtils.getLeaderEpochAtInit(pgContainer.getDataSource(), TOPIC_ID1, 0)).isEqualTo(5); + + // Verify producer state was created + final var producerStates = DBUtils.getAllProducerState(pgContainer.getDataSource()); + assertThat(producerStates).hasSize(2); + + final ProducerStateRecord state1 = producerStates.stream() + .filter(s -> s.getProducerId() == 1001L) + .findFirst() + .orElseThrow(); + assertThat(state1.getProducerEpoch()).isEqualTo((short) 1); + assertThat(state1.getBaseSequence()).isEqualTo(0); + assertThat(state1.getLastSequence()).isEqualTo(9); + assertThat(state1.getAssignedOffset()).isEqualTo(100L); + assertThat(state1.getBatchMaxTimestamp()).isEqualTo(1000L); + + final ProducerStateRecord state2 = producerStates.stream() + .filter(s -> s.getProducerId() == 1002L) + .findFirst() + .orElseThrow(); + assertThat(state2.getProducerEpoch()).isEqualTo((short) 2); + assertThat(state2.getBaseSequence()).isEqualTo(0); + assertThat(state2.getLastSequence()).isEqualTo(4); + assertThat(state2.getAssignedOffset()).isEqualTo(110L); + assertThat(state2.getBatchMaxTimestamp()).isEqualTo(2000L); + } + + @Test + void throwsExceptionWhenStaleLeaderEpoch() { + // First initialization with leader epoch 5 + runInitJob(Set.of(new InitDisklessLogRequest(TOPIC_ID1, TOPIC_1, 0, 100L, 200L, 5, List.of()))); + + // Second initialization attempt with lower epoch (should throw StaleLeaderEpochException) + final var secondRequests = Set.of(new InitDisklessLogRequest(TOPIC_ID1, TOPIC_1, 0, 300L, 400L, 3, List.of())); + assertThatThrownBy(() -> runInitJob(secondRequests)) + .isInstanceOf(StaleLeaderEpochException.class) + .satisfies(e -> { + final StaleLeaderEpochException ex = (StaleLeaderEpochException) e; + assertThat(ex.topicId()).isEqualTo(TOPIC_ID1); + assertThat(ex.partition()).isEqualTo(0); + assertThat(ex.requestedEpoch()).isEqualTo(3); + }); + + // Verify the log still has the original values + final var logs = DBUtils.getAllLogs(pgContainer.getDataSource()); + assertThat(logs).hasSize(1); + final var log = logs.iterator().next(); + assertThat(log.getLogStartOffset()).isEqualTo(100L); + assertThat(log.getHighWatermark()).isEqualTo(200L); + assertThat(DBUtils.getLeaderEpochAtInit(pgContainer.getDataSource(), TOPIC_ID1, 0)).isEqualTo(5); + } + + @Test + void throwsExceptionWhenMessagesAppended() { + // First initialization with leader epoch 5 + final var producerState = List.of(new ProducerStateSnapshot(1001L, (short) 1, 0, 9, 100L, 1000L)); + runInitJob(Set.of(new InitDisklessLogRequest(TOPIC_ID1, TOPIC_1, 0, 100L, 200L, 5, producerState))); + + // Simulate messages being appended (high_watermark moves past diskless_start_offset) + DBUtils.simulateMessagesAppended(pgContainer.getDataSource(), TOPIC_ID1, 0, 300L); + + // Second initialization with higher leader epoch should fail because messages have been appended + final var secondRequests = Set.of(new InitDisklessLogRequest(TOPIC_ID1, TOPIC_1, 0, 300L, 400L, 7, List.of())); + assertThatThrownBy(() -> runInitJob(secondRequests)) + .isInstanceOf(DisklessLogAlreadyInitializedException.class) + .satisfies(e -> { + final var ex = (DisklessLogAlreadyInitializedException) e; + assertThat(ex.topicId()).isEqualTo(TOPIC_ID1); + assertThat(ex.partition()).isEqualTo(0); + }); + + // Verify the log still has the original values (except high_watermark which was updated by simulation) + final var log = DBUtils.getAllLogs(pgContainer.getDataSource()).iterator().next(); + assertThat(log.getLogStartOffset()).isEqualTo(100L); + assertThat(log.getHighWatermark()).isEqualTo(300L); + assertThat(log.getDisklessStartOffset()).isEqualTo(200L); + assertThat(DBUtils.getLeaderEpochAtInit(pgContainer.getDataSource(), TOPIC_ID1, 0)).isEqualTo(5); + + // Verify producer state was not changed + assertThat(DBUtils.getAllProducerState(pgContainer.getDataSource())) + .hasSize(1) + .extracting(ProducerStateRecord::getProducerId) + .containsExactly(1001L); + } + + @Test + void updateLogInMigrationPhase() { + // First initialization with leader epoch 5 + final var producerState = List.of(new ProducerStateSnapshot(1001L, (short) 1, 0, 9, 100L, 1000L)); + runInitJob(Set.of(new InitDisklessLogRequest(TOPIC_ID1, TOPIC_1, 0, 100L, 200L, 5, producerState))); + assertThat(DBUtils.getAllProducerState(pgContainer.getDataSource())).hasSize(1); + + // Second initialization with equal leader epoch while still in migration phase - should succeed + runInitJob(Set.of(new InitDisklessLogRequest(TOPIC_ID1, TOPIC_1, 0, 150L, 250L, 5, List.of()))); + + // Verify the log was updated and producer state was cleared + var log = DBUtils.getAllLogs(pgContainer.getDataSource()).iterator().next(); + assertThat(log.getLogStartOffset()).isEqualTo(150L); + assertThat(log.getHighWatermark()).isEqualTo(250L); + assertThat(log.getDisklessStartOffset()).isEqualTo(250L); + assertThat(DBUtils.getLeaderEpochAtInit(pgContainer.getDataSource(), TOPIC_ID1, 0)).isEqualTo(5); + assertThat(DBUtils.getAllProducerState(pgContainer.getDataSource())).isEmpty(); + + // Third initialization with higher leader epoch while still in migration phase - should succeed + final var newProducerState = List.of( + new ProducerStateSnapshot(2001L, (short) 3, 0, 14, 300L, 3000L), + new ProducerStateSnapshot(2002L, (short) 1, 5, 10, 320L, 3200L) + ); + runInitJob(Set.of(new InitDisklessLogRequest(TOPIC_ID1, TOPIC_1, 0, 300L, 400L, 7, newProducerState))); + + // Verify the log was updated + log = DBUtils.getAllLogs(pgContainer.getDataSource()).iterator().next(); + assertThat(log.getLogStartOffset()).isEqualTo(300L); + assertThat(log.getHighWatermark()).isEqualTo(400L); + assertThat(log.getDisklessStartOffset()).isEqualTo(400L); + assertThat(DBUtils.getLeaderEpochAtInit(pgContainer.getDataSource(), TOPIC_ID1, 0)).isEqualTo(7); + + // Verify producer state was inserted + assertThat(DBUtils.getAllProducerState(pgContainer.getDataSource())) + .hasSize(2) + .extracting(ProducerStateRecord::getProducerId) + .containsExactlyInAnyOrder(2001L, 2002L); + } + + @Test + void multiplePartitionsCanBeInitialized() { + runInitJob(Set.of( + new InitDisklessLogRequest(TOPIC_ID1, TOPIC_1, 0, 100L, 200L, 5, List.of()), + new InitDisklessLogRequest(TOPIC_ID1, TOPIC_1, 1, 50L, 100L, 3, List.of()) + )); + + final var logs = DBUtils.getAllLogs(pgContainer.getDataSource()); + assertThat(logs).hasSize(2); + + final var partition0 = logs.stream().filter(l -> l.getPartition() == 0).findFirst().orElseThrow(); + assertThat(partition0.getLogStartOffset()).isEqualTo(100L); + assertThat(partition0.getHighWatermark()).isEqualTo(200L); + + final var partition1 = logs.stream().filter(l -> l.getPartition() == 1).findFirst().orElseThrow(); + assertThat(partition1.getLogStartOffset()).isEqualTo(50L); + assertThat(partition1.getHighWatermark()).isEqualTo(100L); + } +} diff --git a/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/TopicsAndPartitionsCreateJobTest.java b/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/TopicsAndPartitionsCreateJobTest.java index 7e818812c9..6a98ccbc9f 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/TopicsAndPartitionsCreateJobTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/TopicsAndPartitionsCreateJobTest.java @@ -96,18 +96,18 @@ void createTopicsAndPartition() { final TopicsAndPartitionsCreateJob job1 = new TopicsAndPartitionsCreateJob(Time.SYSTEM, pgContainer.getJooqCtx(), createTopicAndPartitionsRequests, durationMs -> {}); job1.run(); assertThat(DBUtils.getAllLogs(pgContainer.getDataSource())).containsExactlyInAnyOrder( - new LogsRecord(TOPIC_ID1, 0, TOPIC_1, 0L, 0L, 0L), - new LogsRecord(TOPIC_ID1, 1, TOPIC_1, 0L, 0L, 0L), - new LogsRecord(TOPIC_ID2, 0, TOPIC_2, 0L, 0L, 0L) + new LogsRecord(TOPIC_ID1, 0, TOPIC_1, 0L, 0L, 0L, 0L, null, 0), + new LogsRecord(TOPIC_ID1, 1, TOPIC_1, 0L, 0L, 0L, 0L, null, 0), + new LogsRecord(TOPIC_ID2, 0, TOPIC_2, 0L, 0L, 0L, 0L, null, 0) ); // Repetition doesn't affect anything. final TopicsAndPartitionsCreateJob job2 = new TopicsAndPartitionsCreateJob(Time.SYSTEM, pgContainer.getJooqCtx(), createTopicAndPartitionsRequests, durationMs -> {}); job2.run(); assertThat(DBUtils.getAllLogs(pgContainer.getDataSource())).containsExactlyInAnyOrder( - new LogsRecord(TOPIC_ID1, 0, TOPIC_1, 0L, 0L, 0L), - new LogsRecord(TOPIC_ID1, 1, TOPIC_1, 0L, 0L, 0L), - new LogsRecord(TOPIC_ID2, 0, TOPIC_2, 0L, 0L, 0L) + new LogsRecord(TOPIC_ID1, 0, TOPIC_1, 0L, 0L, 0L, 0L, null, 0), + new LogsRecord(TOPIC_ID1, 1, TOPIC_1, 0L, 0L, 0L, 0L, null, 0), + new LogsRecord(TOPIC_ID2, 0, TOPIC_2, 0L, 0L, 0L, 0L, null, 0) ); } @@ -128,10 +128,10 @@ void createPartitionAfterTopic() { job2.run(); assertThat(DBUtils.getAllLogs(pgContainer.getDataSource())).containsExactlyInAnyOrder( - new LogsRecord(TOPIC_ID1, 0, TOPIC_1, 0L, 0L, 0L), - new LogsRecord(TOPIC_ID1, 1, TOPIC_1, 0L, 0L, 0L), - new LogsRecord(TOPIC_ID2, 0, TOPIC_2, 0L, 0L, 0L), - new LogsRecord(TOPIC_ID2, 1, TOPIC_2, 0L, 0L, 0L) + new LogsRecord(TOPIC_ID1, 0, TOPIC_1, 0L, 0L, 0L, 0L, null, 0), + new LogsRecord(TOPIC_ID1, 1, TOPIC_1, 0L, 0L, 0L, 0L, null, 0), + new LogsRecord(TOPIC_ID2, 0, TOPIC_2, 0L, 0L, 0L, 0L, null, 0), + new LogsRecord(TOPIC_ID2, 1, TOPIC_2, 0L, 0L, 0L, 0L, null, 0) ); } @@ -157,9 +157,9 @@ void existingRecordsNotAffected() throws SQLException { job1.run(); assertThat(DBUtils.getAllLogs(pgContainer.getDataSource())).containsExactlyInAnyOrder( - new LogsRecord(TOPIC_ID1, 0, TOPIC_1, 101L, 201L, 999L), // unaffected - new LogsRecord(TOPIC_ID1, 1, TOPIC_1, 0L, 0L, 0L), - new LogsRecord(TOPIC_ID2, 0, TOPIC_2, 102L, 202L, 1999L) // unaffected + new LogsRecord(TOPIC_ID1, 0, TOPIC_1, 101L, 201L, 999L, 0L, null, 0), // unaffected + new LogsRecord(TOPIC_ID1, 1, TOPIC_1, 0L, 0L, 0L, 0L, null, 0), + new LogsRecord(TOPIC_ID2, 0, TOPIC_2, 102L, 202L, 1999L, 0L, null, 0) // unaffected ); } }