diff --git a/storage/inkless/src/main/java/io/aiven/inkless/control_plane/ControlPlane.java b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/ControlPlane.java index a008b02154..8d132d2a2a 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/control_plane/ControlPlane.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/ControlPlane.java @@ -55,6 +55,8 @@ List findBatches( void createTopicAndPartitions(Set requests); + void initDisklessLog(Set requests); + List deleteRecords(List requests); void deleteTopics(Set topicIds); @@ -94,6 +96,8 @@ static ControlPlane create(final InklessConfig config, final Time time) { boolean isSafeToDeleteFile(String objectKeyPath); + List getDisklessLog(List requests); + // used for testing purposes only List getLogInfo(List requests); } diff --git a/storage/inkless/src/main/java/io/aiven/inkless/control_plane/DisklessLogAlreadyInitializedException.java b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/DisklessLogAlreadyInitializedException.java new file mode 100644 index 0000000000..e242b27a4b --- /dev/null +++ b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/DisklessLogAlreadyInitializedException.java @@ -0,0 +1,44 @@ +/* + * Inkless + * Copyright (C) 2026 Aiven OY + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package io.aiven.inkless.control_plane; + +import org.apache.kafka.common.Uuid; + +/** + * Exception thrown when attempting to initialize a diskless log that has already been initialized. + */ +public class DisklessLogAlreadyInitializedException extends ControlPlaneException { + + private final Uuid topicId; + private final int partition; + + public DisklessLogAlreadyInitializedException(final Uuid topicId, final int partition) { + super(String.format("Diskless log already initialized for topic %s partition %d", + topicId, partition)); + this.topicId = topicId; + this.partition = partition; + } + + public Uuid topicId() { + return topicId; + } + + public int partition() { + return partition; + } +} diff --git a/storage/inkless/src/main/java/io/aiven/inkless/control_plane/GetDisklessLogRequest.java b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/GetDisklessLogRequest.java new file mode 100644 index 0000000000..2855735853 --- /dev/null +++ b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/GetDisklessLogRequest.java @@ -0,0 +1,24 @@ +/* + * Inkless + * Copyright (C) 2026 Aiven OY + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package io.aiven.inkless.control_plane; + +import org.apache.kafka.common.Uuid; + +public record GetDisklessLogRequest(Uuid topicId, + int partition) { +} diff --git a/storage/inkless/src/main/java/io/aiven/inkless/control_plane/GetDisklessLogResponse.java b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/GetDisklessLogResponse.java new file mode 100644 index 0000000000..a69663e44f --- /dev/null +++ b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/GetDisklessLogResponse.java @@ -0,0 +1,42 @@ +/* + * Inkless + * Copyright (C) 2026 Aiven OY + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package io.aiven.inkless.control_plane; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.protocol.Errors; + +public record GetDisklessLogResponse(Uuid topicId, + int partition, + Errors error, + long logStartOffset, + long highWatermark, + Long disklessStartOffset) { + public static final long INVALID_OFFSET = -1L; + + public static GetDisklessLogResponse success(final Uuid topicId, + final int partition, + final long logStartOffset, + final long highWatermark, + final Long disklessStartOffset) { + return new GetDisklessLogResponse(topicId, partition, Errors.NONE, logStartOffset, highWatermark, disklessStartOffset); + } + + public static GetDisklessLogResponse unknownTopicOrPartition(final Uuid topicId, final int partition) { + return new GetDisklessLogResponse(topicId, partition, Errors.UNKNOWN_TOPIC_OR_PARTITION, INVALID_OFFSET, INVALID_OFFSET, INVALID_OFFSET); + } +} diff --git a/storage/inkless/src/main/java/io/aiven/inkless/control_plane/InMemoryControlPlane.java b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/InMemoryControlPlane.java index aa99e79e84..83e7acf5b7 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/control_plane/InMemoryControlPlane.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/InMemoryControlPlane.java @@ -90,6 +90,70 @@ public synchronized void createTopicAndPartitions(final Set requests) { + for (final InitDisklessLogRequest request : requests) { + final TopicIdPartition topicIdPartition = new TopicIdPartition( + request.topicId(), request.partition(), request.topicName()); + + final LogInfo existingLogInfo = logs.get(topicIdPartition); + if (existingLogInfo != null) { + // Check if leader epoch is stale + if (request.leaderEpoch() < existingLogInfo.leaderEpochAtInit) { + throw new StaleLeaderEpochException( + request.topicId(), request.partition(), request.leaderEpoch()); + } + + // Check for invalid state: disklessStartOffset should never exceed highWatermark + if (existingLogInfo.disklessStartOffset > existingLogInfo.highWatermark) { + throw new IllegalStateException(String.format( + "Invalid state for %s: disklessStartOffset (%d) > highWatermark (%d)", + topicIdPartition, existingLogInfo.disklessStartOffset, existingLogInfo.highWatermark)); + } + + // Check if messages have been appended (no longer in migration phase) + if (existingLogInfo.disklessStartOffset < existingLogInfo.highWatermark) { + throw new DisklessLogAlreadyInitializedException( + request.topicId(), request.partition()); + } + + // Still in migration phase with valid epoch - update existing log + LOGGER.info("Updating {} with logStartOffset {}, disklessStartOffset {}, leaderEpoch {}", + topicIdPartition, request.logStartOffset(), request.disklessStartOffset(), request.leaderEpoch()); + existingLogInfo.logStartOffset = request.logStartOffset(); + existingLogInfo.highWatermark = request.disklessStartOffset(); + existingLogInfo.disklessStartOffset = request.disklessStartOffset(); + existingLogInfo.leaderEpochAtInit = request.leaderEpoch(); + + // Clear existing producer state for this partition + producers.remove(topicIdPartition); + } else { + // Create new log entry + LOGGER.info("Initializing {} with logStartOffset {}, disklessStartOffset {}, leaderEpoch {}", + topicIdPartition, request.logStartOffset(), request.disklessStartOffset(), request.leaderEpoch()); + final LogInfo logInfo = new LogInfo(); + logInfo.logStartOffset = request.logStartOffset(); + logInfo.highWatermark = request.disklessStartOffset(); + logInfo.disklessStartOffset = request.disklessStartOffset(); + logInfo.leaderEpochAtInit = request.leaderEpoch(); + logs.put(topicIdPartition, logInfo); + batches.putIfAbsent(topicIdPartition, new TreeMap<>()); + } + + // Insert producer state entries + if (request.producerStateEntries() != null && !request.producerStateEntries().isEmpty()) { + final TreeMap partitionProducers = + producers.computeIfAbsent(topicIdPartition, k -> new TreeMap<>()); + for (final ProducerStateSnapshot entry : request.producerStateEntries()) { + final LatestProducerState producerState = partitionProducers + .computeIfAbsent(entry.producerId(), k -> LatestProducerState.empty(entry.producerEpoch())); + producerState.addElement(entry.baseSequence(), entry.lastSequence(), + entry.assignedOffset(), entry.batchMaxTimestamp()); + } + } + } + } + @Override protected synchronized Iterator commitFileForValidRequests( final String objectKey, @@ -644,6 +708,27 @@ public boolean isSafeToDeleteFile(String objectKeyPath) { return !files.containsKey(objectKeyPath); } + @Override + public synchronized List getDisklessLog(final List requests) { + final List result = new ArrayList<>(); + for (final GetDisklessLogRequest request : requests) { + final TopicIdPartition tidp = findTopicIdPartition(request.topicId(), request.partition()); + final LogInfo logInfo; + if (tidp == null || (logInfo = logs.get(tidp)) == null) { + result.add(GetDisklessLogResponse.unknownTopicOrPartition(request.topicId(), request.partition())); + } else { + result.add(GetDisklessLogResponse.success( + request.topicId(), + request.partition(), + logInfo.logStartOffset, + logInfo.highWatermark, + logInfo.disklessStartOffset + )); + } + } + return result; + } + @Override public synchronized List getLogInfo(final List requests) { final List result = new ArrayList<>(); @@ -679,6 +764,8 @@ private static class LogInfo { long logStartOffset = 0; long highWatermark = 0; long byteSize = 0; + Long disklessStartOffset = null; + int leaderEpochAtInit = 0; } private static class FileInfo { diff --git a/storage/inkless/src/main/java/io/aiven/inkless/control_plane/InitDisklessLogRequest.java b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/InitDisklessLogRequest.java new file mode 100644 index 0000000000..678a0f0fd9 --- /dev/null +++ b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/InitDisklessLogRequest.java @@ -0,0 +1,109 @@ +/* + * Inkless + * Copyright (C) 2026 Aiven OY + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package io.aiven.inkless.control_plane; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.storage.internals.log.BatchMetadata; +import org.apache.kafka.storage.internals.log.ProducerStateEntry; +import org.apache.kafka.storage.internals.log.ProducerStateManager; +import org.apache.kafka.storage.internals.log.UnifiedLog; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * Request to initialize the diskless start offset for a partition. + * + * @param topicId The topic ID + * @param topicName The topic name + * @param partition The partition number + * @param logStartOffset The log start offset + * @param disklessStartOffset The diskless start offset (same as high watermark when transitioning) + * @param leaderEpoch The leader epoch of the broker making the request. Used to reject stale requests. + * @param producerStateEntries The producer state entries to store during initialization + */ +public record InitDisklessLogRequest(Uuid topicId, + String topicName, + int partition, + long logStartOffset, + long disklessStartOffset, + int leaderEpoch, + List producerStateEntries) { + + /** + * Creates an InitDisklessLogRequest from a UnifiedLog. + * @param log The UnifiedLog to extract information from + * @param leaderEpoch The current leader epoch for this partition + * @throws IllegalStateException if the log does not have a topic ID + */ + public static InitDisklessLogRequest fromUnifiedLog(final UnifiedLog log, final int leaderEpoch) { + final Uuid topicId = log.topicId() + .orElseThrow(() -> new IllegalStateException("Topic ID is required for diskless initialization")); + + final String topicName = log.topicPartition().topic(); + final int partition = log.topicPartition().partition(); + final long logStartOffset = log.logStartOffset(); + final long highWatermark = log.highWatermark(); + + // Extract producer state entries + final List producerStateEntries = extractProducerState(log.producerStateManager()); + + return new InitDisklessLogRequest( + topicId, + topicName, + partition, + logStartOffset, + highWatermark, + leaderEpoch, + producerStateEntries + ); + } + + /** + * Extracts the producer state from the ProducerStateManager. + *

+ * For each active producer, this method extracts all retained batch metadata entries + * (up to {@link ProducerStateEntry#NUM_BATCHES_TO_RETAIN} per producer) to support + * duplicate detection after the transition to diskless. + * + * @return A list of ProducerStateSnapshot entries for all active producers + */ + private static List extractProducerState(final ProducerStateManager producerStateManager) { + final Map activeProducers = producerStateManager.activeProducers(); + final List snapshots = new ArrayList<>(); + + for (final Map.Entry entry : activeProducers.entrySet()) { + final long producerId = entry.getKey(); + final ProducerStateEntry state = entry.getValue(); + + for (final BatchMetadata batch : state.batchMetadata()) { + snapshots.add(new ProducerStateSnapshot( + producerId, + state.producerEpoch(), + batch.firstSeq(), + batch.lastSeq, + batch.firstOffset(), // assigned offset is the first offset of the batch + batch.timestamp + )); + } + } + + return snapshots; + } +} diff --git a/storage/inkless/src/main/java/io/aiven/inkless/control_plane/ProducerStateSnapshot.java b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/ProducerStateSnapshot.java new file mode 100644 index 0000000000..037f914f57 --- /dev/null +++ b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/ProducerStateSnapshot.java @@ -0,0 +1,40 @@ +/* + * Inkless + * Copyright (C) 2026 Aiven OY + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package io.aiven.inkless.control_plane; + +/** + * Represents a single producer state entry to be stored in the control plane. + * This is used during the initialization of diskless log to transfer producer state + * from the local log to the control plane. + * + * @param producerId The producer ID + * @param producerEpoch The producer epoch + * @param baseSequence The base sequence number of the batch + * @param lastSequence The last sequence number of the batch + * @param assignedOffset The offset assigned to this batch + * @param batchMaxTimestamp The maximum timestamp in the batch + */ +public record ProducerStateSnapshot( + long producerId, + short producerEpoch, + int baseSequence, + int lastSequence, + long assignedOffset, + long batchMaxTimestamp +) { +} diff --git a/storage/inkless/src/main/java/io/aiven/inkless/control_plane/StaleLeaderEpochException.java b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/StaleLeaderEpochException.java new file mode 100644 index 0000000000..089d43b51c --- /dev/null +++ b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/StaleLeaderEpochException.java @@ -0,0 +1,51 @@ +/* + * Inkless + * Copyright (C) 2026 Aiven OY + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package io.aiven.inkless.control_plane; + +import org.apache.kafka.common.Uuid; + +/** + * Exception thrown when an operation is rejected because the provided leader epoch + * is stale (less than or equal to the current leader epoch stored in the control plane). + */ +public class StaleLeaderEpochException extends ControlPlaneException { + + private final Uuid topicId; + private final int partition; + private final int requestedEpoch; + + public StaleLeaderEpochException(final Uuid topicId, final int partition, final int requestedEpoch) { + super(String.format("Stale leader epoch %d for topic %s partition %d", + requestedEpoch, topicId, partition)); + this.topicId = topicId; + this.partition = partition; + this.requestedEpoch = requestedEpoch; + } + + public Uuid topicId() { + return topicId; + } + + public int partition() { + return partition; + } + + public int requestedEpoch() { + return requestedEpoch; + } +} diff --git a/storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/GetDisklessLogJob.java b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/GetDisklessLogJob.java new file mode 100644 index 0000000000..18da56824f --- /dev/null +++ b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/GetDisklessLogJob.java @@ -0,0 +1,117 @@ +/* + * Inkless + * Copyright (C) 2026 Aiven OY + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package io.aiven.inkless.control_plane.postgres; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.utils.Time; + +import org.jooq.Configuration; +import org.jooq.DSLContext; +import org.jooq.Field; +import org.jooq.Row2; +import org.jooq.impl.SQLDataType; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.function.Consumer; + +import io.aiven.inkless.control_plane.GetDisklessLogRequest; +import io.aiven.inkless.control_plane.GetDisklessLogResponse; +import io.aiven.inkless.control_plane.postgres.converters.UUIDtoUuidConverter; + +import static org.jooq.generated.Tables.LOGS; +import static org.jooq.impl.DSL.field; +import static org.jooq.impl.DSL.name; +import static org.jooq.impl.DSL.row; +import static org.jooq.impl.DSL.values; + +public class GetDisklessLogJob implements Callable> { + private static final Field REQUEST_TOPIC_ID = field(name("topic_id"), SQLDataType.UUID); + private static final Field REQUEST_PARTITION = field(name("partition"), LOGS.PARTITION.getDataType()); + + private final Time time; + private final DSLContext jooqCtx; + private final List requests; + private final Consumer durationCallback; + + public GetDisklessLogJob(final Time time, + final DSLContext jooqCtx, + final List requests, + final Consumer durationCallback) { + this.time = time; + this.jooqCtx = jooqCtx; + this.requests = requests; + this.durationCallback = durationCallback; + } + + @Override + public List call() throws Exception { + if (requests.isEmpty()) { + return List.of(); + } + return JobUtils.run(this::runOnce, time, durationCallback); + } + + private List runOnce() throws Exception { + return jooqCtx.transactionResult((final Configuration conf) -> { + final DSLContext context = conf.dsl(); + + final UUIDtoUuidConverter uuidConverter = new UUIDtoUuidConverter(); + final var requestRows = requests.stream() + .map(req -> row(uuidConverter.to(req.topicId()), req.partition())) + .toArray(Row2[]::new); + @SuppressWarnings("unchecked") + final var requestsTable = values(requestRows) + .as("requests", REQUEST_TOPIC_ID.getName(), REQUEST_PARTITION.getName()); + + final var select = context.select( + requestsTable.field(REQUEST_TOPIC_ID), + requestsTable.field(REQUEST_PARTITION), + LOGS.LOG_START_OFFSET, + LOGS.HIGH_WATERMARK, + LOGS.DISKLESS_START_OFFSET + ).from(requestsTable) + .leftJoin(LOGS).on(LOGS.TOPIC_ID.coerce(SQLDataType.UUID).eq(requestsTable.field(REQUEST_TOPIC_ID)) + .and(LOGS.PARTITION.eq(requestsTable.field(REQUEST_PARTITION)))); + + final List responses = new ArrayList<>(); + try (final var cursor = select.fetchSize(1000).fetchLazy()) { + for (final var record : cursor) { + final UUID rawTopicId = record.get(requestsTable.field(REQUEST_TOPIC_ID)); + final Uuid topicId = uuidConverter.from(rawTopicId); + final Integer partition = record.get(requestsTable.field(REQUEST_PARTITION)); + final Long logStartOffset = record.get(LOGS.LOG_START_OFFSET); + if (logStartOffset == null) { + responses.add(GetDisklessLogResponse.unknownTopicOrPartition(topicId, partition)); + } else { + responses.add(GetDisklessLogResponse.success( + topicId, + partition, + logStartOffset, + record.get(LOGS.HIGH_WATERMARK), + record.get(LOGS.DISKLESS_START_OFFSET) + )); + } + } + } + return responses; + }); + } +} diff --git a/storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/InitDisklessLogJob.java b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/InitDisklessLogJob.java new file mode 100644 index 0000000000..40b884f8fc --- /dev/null +++ b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/InitDisklessLogJob.java @@ -0,0 +1,141 @@ +/* + * Inkless + * Copyright (C) 2026 Aiven OY + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package io.aiven.inkless.control_plane.postgres; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.utils.Time; + +import org.jooq.Configuration; +import org.jooq.DSLContext; +import org.jooq.generated.udt.InitDisklessLogResponseV1; +import org.jooq.generated.udt.records.InitDisklessLogProducerStateV1Record; +import org.jooq.generated.udt.records.InitDisklessLogRequestV1Record; +import org.jooq.generated.udt.records.InitDisklessLogResponseV1Record; + +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.function.Consumer; + +import io.aiven.inkless.control_plane.DisklessLogAlreadyInitializedException; +import io.aiven.inkless.control_plane.InitDisklessLogRequest; +import io.aiven.inkless.control_plane.ProducerStateSnapshot; +import io.aiven.inkless.control_plane.StaleLeaderEpochException; + +import static org.jooq.generated.Tables.INIT_DISKLESS_LOG_V1; + +public class InitDisklessLogJob implements Callable { + + private final Time time; + private final DSLContext jooqCtx; + private final Set requests; + private final Consumer durationCallback; + + InitDisklessLogJob(final Time time, + final DSLContext jooqCtx, + final Set requests, + final Consumer durationCallback) { + this.time = time; + this.jooqCtx = jooqCtx; + this.requests = requests; + this.durationCallback = durationCallback; + } + + @Override + public Void call() { + if (requests.isEmpty()) { + return null; + } + return JobUtils.run(this::runOnce, time, durationCallback); + } + + private Void runOnce() { + final InitDisklessLogRequestV1Record[] jooqRequests = requests.stream() + .map(this::toJooqRequest) + .toArray(InitDisklessLogRequestV1Record[]::new); + + final List responses = jooqCtx.transactionResult( + (final Configuration conf) -> conf.dsl().select( + InitDisklessLogResponseV1.TOPIC_ID, + InitDisklessLogResponseV1.PARTITION, + InitDisklessLogResponseV1.ERROR + ).from(INIT_DISKLESS_LOG_V1.call(jooqRequests)) + .fetchInto(InitDisklessLogResponseV1Record.class) + ); + + for (final var response : responses) { + switch (response.getError()) { + case none -> { } + case stale_leader_epoch -> throw new StaleLeaderEpochException( + response.getTopicId(), + response.getPartition(), + findLeaderEpoch(response.getTopicId(), response.getPartition()) + ); + case invalid_state -> throw new IllegalStateException(String.format( + "Invalid state for topic %s partition %d: disklessStartOffset > highWatermark", + response.getTopicId(), response.getPartition() + )); + case already_initialized -> throw new DisklessLogAlreadyInitializedException( + response.getTopicId(), + response.getPartition() + ); + } + } + return null; + } + + private InitDisklessLogRequestV1Record toJooqRequest(final InitDisklessLogRequest request) { + final InitDisklessLogProducerStateV1Record[] producerState; + if (request.producerStateEntries() == null || request.producerStateEntries().isEmpty()) { + producerState = new InitDisklessLogProducerStateV1Record[0]; + } else { + producerState = request.producerStateEntries().stream() + .map(this::toJooqProducerState) + .toArray(InitDisklessLogProducerStateV1Record[]::new); + } + + return new InitDisklessLogRequestV1Record( + request.topicId(), + request.partition(), + request.topicName(), + request.logStartOffset(), + request.disklessStartOffset(), + request.leaderEpoch(), + producerState + ); + } + + private InitDisklessLogProducerStateV1Record toJooqProducerState(final ProducerStateSnapshot entry) { + return new InitDisklessLogProducerStateV1Record( + entry.producerId(), + entry.producerEpoch(), + entry.baseSequence(), + entry.lastSequence(), + entry.assignedOffset(), + entry.batchMaxTimestamp() + ); + } + + private int findLeaderEpoch(final Uuid topicId, final int partition) { + return requests.stream() + .filter(r -> r.topicId().equals(topicId) && r.partition() == partition) + .findFirst() + .map(InitDisklessLogRequest::leaderEpoch) + .orElse(-1); + } +} diff --git a/storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/PostgresControlPlane.java b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/PostgresControlPlane.java index e76c869315..9d641236f4 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/PostgresControlPlane.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/PostgresControlPlane.java @@ -57,8 +57,11 @@ import io.aiven.inkless.control_plane.FileToDelete; import io.aiven.inkless.control_plane.FindBatchRequest; import io.aiven.inkless.control_plane.FindBatchResponse; +import io.aiven.inkless.control_plane.GetDisklessLogRequest; +import io.aiven.inkless.control_plane.GetDisklessLogResponse; import io.aiven.inkless.control_plane.GetLogInfoRequest; import io.aiven.inkless.control_plane.GetLogInfoResponse; +import io.aiven.inkless.control_plane.InitDisklessLogRequest; import io.aiven.inkless.control_plane.ListOffsetsRequest; import io.aiven.inkless.control_plane.ListOffsetsResponse; import io.aiven.inkless.control_plane.MergedFileBatch; @@ -149,6 +152,12 @@ public void createTopicAndPartitions(final Set new TopicsAndPartitionsCreateJob(time, jobsJooqCtx, requests, pgMetrics::onTopicCreateCompleted).run(); } + @Override + public void initDisklessLog(final Set requests) { + // Expected to be performed synchronously + new InitDisklessLogJob(time, jobsJooqCtx, requests, pgMetrics::onInitDisklessLogCompleted).call(); + } + @Override protected Iterator commitFileForValidRequests( final String objectKey, @@ -333,6 +342,20 @@ public boolean isSafeToDeleteFile(String objectKeyPath) { } } + @Override + public List getDisklessLog(final List requests) { + try { + final GetDisklessLogJob job = new GetDisklessLogJob(time, readJooqCtx, requests, pgMetrics::onGetDisklessLogCompleted); + return job.call(); + } catch (final Exception e) { + if (e instanceof ControlPlaneException) { + throw (ControlPlaneException) e; + } else { + throw new ControlPlaneException("Failed to get diskless log", e); + } + } + } + @Override public List getLogInfo(final List requests) { try { diff --git a/storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/PostgresControlPlaneMetrics.java b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/PostgresControlPlaneMetrics.java index 3f8d55738e..89e1f76d8f 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/PostgresControlPlaneMetrics.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/PostgresControlPlaneMetrics.java @@ -36,6 +36,7 @@ public class PostgresControlPlaneMetrics implements Closeable { private final QueryMetrics commitFileMetrics = new QueryMetrics("CommitFile"); private final QueryMetrics commitFileMergeWorkItemMetrics = new QueryMetrics("CommitFileMergeWorkItem"); private final QueryMetrics topicCreateMetrics = new QueryMetrics("TopicCreate"); + private final QueryMetrics initDisklessLogMetrics = new QueryMetrics("InitDisklessLog"); private final QueryMetrics topicDeleteMetrics = new QueryMetrics("TopicDelete"); private final QueryMetrics fileDeleteMetrics = new QueryMetrics("FilesDelete"); private final QueryMetrics listOffsetsMetrics = new QueryMetrics("ListOffsets"); @@ -46,6 +47,7 @@ public class PostgresControlPlaneMetrics implements Closeable { private final QueryMetrics releaseFileMergeWorkItemMetrics = new QueryMetrics("ReleaseFileMergeWorkItem"); private final QueryMetrics safeDeleteFileCheckMetrics = new QueryMetrics("SafeDeleteFileCheck"); private final QueryMetrics getLogInfoMetrics = new QueryMetrics("GetLogInfo"); + private final QueryMetrics getDisklessLogMetrics = new QueryMetrics("GetDisklessLog"); public PostgresControlPlaneMetrics(Time time) { this.time = Objects.requireNonNull(time, "time cannot be null"); @@ -75,6 +77,10 @@ public void onTopicCreateCompleted(Long duration) { topicCreateMetrics.record(duration); } + public void onInitDisklessLogCompleted(Long duration) { + initDisklessLogMetrics.record(duration); + } + public void onFilesDeleteCompleted(Long duration) { fileDeleteMetrics.record(duration); } @@ -111,6 +117,10 @@ public void onGetLogInfoCompleted(Long duration) { getLogInfoMetrics.record(duration); } + public void onGetDisklessLogCompleted(Long duration) { + getDisklessLogMetrics.record(duration); + } + @Override public void close() { findBatchesMetrics.remove(); @@ -118,6 +128,7 @@ public void close() { commitFileMetrics.remove(); commitFileMergeWorkItemMetrics.remove(); topicCreateMetrics.remove(); + initDisklessLogMetrics.remove(); topicDeleteMetrics.remove(); fileDeleteMetrics.remove(); listOffsetsMetrics.remove(); @@ -128,6 +139,7 @@ public void close() { releaseFileMergeWorkItemMetrics.remove(); safeDeleteFileCheckMetrics.remove(); getLogInfoMetrics.remove(); + getDisklessLogMetrics.remove(); } private class QueryMetrics { diff --git a/storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/TopicsAndPartitionsCreateJob.java b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/TopicsAndPartitionsCreateJob.java index 6e77d0aad4..c699f04dc8 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/TopicsAndPartitionsCreateJob.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/TopicsAndPartitionsCreateJob.java @@ -69,10 +69,11 @@ private void runOnce() { LOGS.TOPIC_NAME, LOGS.LOG_START_OFFSET, LOGS.HIGH_WATERMARK, - LOGS.BYTE_SIZE); + LOGS.BYTE_SIZE, + LOGS.DISKLESS_START_OFFSET); for (final var request : requests) { for (int partition = 0; partition < request.numPartitions(); partition++) { - insertStep = insertStep.values(request.topicId(), partition, request.topicName(), 0L, 0L, 0L); + insertStep = insertStep.values(request.topicId(), partition, request.topicName(), 0L, 0L, 0L, 0L); } } final int rowsInserted = insertStep.onConflictDoNothing().execute(); diff --git a/storage/inkless/src/main/resources/db/migration/V11__Add_diskless_offsets_to_logs.sql b/storage/inkless/src/main/resources/db/migration/V11__Add_diskless_offsets_to_logs.sql new file mode 100644 index 0000000000..ee69f0b438 --- /dev/null +++ b/storage/inkless/src/main/resources/db/migration/V11__Add_diskless_offsets_to_logs.sql @@ -0,0 +1,154 @@ +-- Copyright (c) 2026 Aiven, Helsinki, Finland. https://aiven.io/ +CREATE DOMAIN leader_epoch_t AS INT NOT NULL +CHECK (VALUE >= 0); + +ALTER TABLE logs ADD COLUMN diskless_start_offset offset_t DEFAULT 0; +ALTER TABLE logs ADD COLUMN diskless_end_offset offset_nullable_t DEFAULT NULL; +ALTER TABLE logs ADD COLUMN leader_epoch_at_init leader_epoch_t DEFAULT 0; + +CREATE TYPE init_diskless_log_producer_state_v1 AS ( + producer_id producer_id_t, + producer_epoch producer_epoch_t, + base_sequence sequence_t, + last_sequence sequence_t, + assigned_offset offset_t, + batch_max_timestamp timestamp_t +); + +CREATE TYPE init_diskless_log_request_v1 AS ( + topic_id topic_id_t, + partition partition_t, + topic_name topic_name_t, + log_start_offset offset_t, + diskless_start_offset offset_t, + leader_epoch leader_epoch_t, + producer_state init_diskless_log_producer_state_v1[] +); + +CREATE TYPE init_diskless_log_response_error_v1 AS ENUM ( + 'none', + 'stale_leader_epoch', + 'already_initialized', + 'invalid_state' +); + +CREATE TYPE init_diskless_log_response_v1 AS ( + topic_id topic_id_t, + partition partition_t, + error init_diskless_log_response_error_v1 +); + +-- Init diskless log function: +-- - Rejects with stale_leader_epoch if leader_epoch < leader_epoch_at_init +-- - Rejects with invalid_state if diskless_start_offset > high_watermark (corrupted state) +-- - Rejects with already_initialized if messages have been appended (diskless_start_offset < high_watermark) +CREATE FUNCTION init_diskless_log_v1( + arg_requests init_diskless_log_request_v1[] +) +RETURNS SETOF init_diskless_log_response_v1 LANGUAGE plpgsql VOLATILE AS $$ +DECLARE + l_request RECORD; + l_existing_log RECORD; + l_producer_state RECORD; +BEGIN + FOR l_request IN + SELECT * + FROM unnest(arg_requests) + LOOP + -- Check if log already exists + SELECT topic_id, partition, leader_epoch_at_init, diskless_start_offset, high_watermark + INTO l_existing_log + FROM logs + WHERE topic_id = l_request.topic_id + AND partition = l_request.partition; + + IF FOUND THEN + -- Check if leader epoch is stale + IF l_request.leader_epoch < l_existing_log.leader_epoch_at_init THEN + RETURN NEXT (l_request.topic_id, l_request.partition, 'stale_leader_epoch')::init_diskless_log_response_v1; + CONTINUE; + END IF; + + -- Check for invalid state: diskless_start_offset should never exceed high_watermark + IF l_existing_log.diskless_start_offset > l_existing_log.high_watermark THEN + RETURN NEXT (l_request.topic_id, l_request.partition, 'invalid_state')::init_diskless_log_response_v1; + CONTINUE; + END IF; + + -- Check if messages have been appended (no longer in migration phase) + IF l_existing_log.diskless_start_offset < l_existing_log.high_watermark THEN + RETURN NEXT (l_request.topic_id, l_request.partition, 'already_initialized')::init_diskless_log_response_v1; + CONTINUE; + END IF; + + -- Still in migration phase with valid epoch - update existing log + UPDATE logs + SET log_start_offset = l_request.log_start_offset, + high_watermark = l_request.diskless_start_offset, + diskless_start_offset = l_request.diskless_start_offset, + leader_epoch_at_init = l_request.leader_epoch + WHERE topic_id = l_request.topic_id + AND partition = l_request.partition; + + -- Delete existing producer state for this partition + DELETE FROM producer_state + WHERE topic_id = l_request.topic_id + AND partition = l_request.partition; + ELSE + -- Insert new log record + INSERT INTO logs ( + topic_id, + partition, + topic_name, + log_start_offset, + high_watermark, + byte_size, + diskless_start_offset, + leader_epoch_at_init + ) + VALUES ( + l_request.topic_id, + l_request.partition, + l_request.topic_name, + l_request.log_start_offset, + l_request.diskless_start_offset, + 0, + l_request.diskless_start_offset, + l_request.leader_epoch + ); + END IF; + + -- Insert producer state entries + IF l_request.producer_state IS NOT NULL THEN + FOR l_producer_state IN + SELECT * + FROM unnest(l_request.producer_state) + LOOP + INSERT INTO producer_state ( + topic_id, + partition, + producer_id, + producer_epoch, + base_sequence, + last_sequence, + assigned_offset, + batch_max_timestamp + ) + VALUES ( + l_request.topic_id, + l_request.partition, + l_producer_state.producer_id, + l_producer_state.producer_epoch, + l_producer_state.base_sequence, + l_producer_state.last_sequence, + l_producer_state.assigned_offset, + l_producer_state.batch_max_timestamp + ); + END LOOP; + END IF; + + RETURN NEXT (l_request.topic_id, l_request.partition, 'none')::init_diskless_log_response_v1; + END LOOP; +END; +$$ +; \ No newline at end of file diff --git a/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/CommitFileJobTest.java b/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/CommitFileJobTest.java index 77daba9322..a255026231 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/CommitFileJobTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/CommitFileJobTest.java @@ -112,9 +112,9 @@ void simpleCommit() { assertThat(DBUtils.getAllLogs(pgContainer.getDataSource())) .containsExactlyInAnyOrder( - new LogsRecord(TOPIC_ID_0, 0, TOPIC_0, 0L, 0L, 0L), - new LogsRecord(TOPIC_ID_0, 1, TOPIC_0, 0L, 15L, 100L), - new LogsRecord(TOPIC_ID_1, 0, TOPIC_1, 0L, 27L, 50L) + new LogsRecord(TOPIC_ID_0, 0, TOPIC_0, 0L, 0L, 0L, 0L, null, 0), + new LogsRecord(TOPIC_ID_0, 1, TOPIC_0, 0L, 15L, 100L, 0L, null, 0), + new LogsRecord(TOPIC_ID_1, 0, TOPIC_1, 0L, 27L, 50L, 0L, null, 0) ); assertThat(DBUtils.getAllFiles(pgContainer.getDataSource())) @@ -168,9 +168,9 @@ void commitMultipleFiles() { assertThat(DBUtils.getAllLogs(pgContainer.getDataSource())) .containsExactlyInAnyOrder( - new LogsRecord(TOPIC_ID_0, 0, TOPIC_0, 0L, 159L, 111L), - new LogsRecord(TOPIC_ID_0, 1, TOPIC_0, 0L, 15L + 245, 322L), - new LogsRecord(TOPIC_ID_1, 0, TOPIC_1, 0L, 27L, 50L) + new LogsRecord(TOPIC_ID_0, 0, TOPIC_0, 0L, 159L, 111L, 0L, null, 0), + new LogsRecord(TOPIC_ID_0, 1, TOPIC_0, 0L, 15L + 245, 322L, 0L, null, 0), + new LogsRecord(TOPIC_ID_1, 0, TOPIC_1, 0L, 27L, 50L, 0L, null, 0) ); assertThat(DBUtils.getAllFiles(pgContainer.getDataSource())) @@ -215,9 +215,9 @@ void nonExistentPartition() { assertThat(DBUtils.getAllLogs(pgContainer.getDataSource())) .containsExactlyInAnyOrder( - new LogsRecord(TOPIC_ID_0, 0, TOPIC_0, 0L, 0L, 0L), - new LogsRecord(TOPIC_ID_0, 1, TOPIC_0, 0L, 15L, 100L), - new LogsRecord(TOPIC_ID_1, 0, TOPIC_1, 0L, 27L, 50L) + new LogsRecord(TOPIC_ID_0, 0, TOPIC_0, 0L, 0L, 0L, 0L, null, 0), + new LogsRecord(TOPIC_ID_0, 1, TOPIC_0, 0L, 15L, 100L, 0L, null, 0), + new LogsRecord(TOPIC_ID_1, 0, TOPIC_1, 0L, 27L, 50L, 0L, null, 0) ); assertThat(DBUtils.getAllFiles(pgContainer.getDataSource())) @@ -249,9 +249,9 @@ void simpleIdempotentCommit() { assertThat(DBUtils.getAllLogs(pgContainer.getDataSource())) .containsExactlyInAnyOrder( - new LogsRecord(TOPIC_ID_0, 0, TOPIC_0, 0L, 0L, 0L), - new LogsRecord(TOPIC_ID_0, 1, TOPIC_0, 0L, 15L, 100L), - new LogsRecord(TOPIC_ID_1, 0, TOPIC_1, 0L, 27L, 50L) + new LogsRecord(TOPIC_ID_0, 0, TOPIC_0, 0L, 0L, 0L, 0L, null, 0), + new LogsRecord(TOPIC_ID_0, 1, TOPIC_0, 0L, 15L, 100L, 0L, null, 0), + new LogsRecord(TOPIC_ID_1, 0, TOPIC_1, 0L, 27L, 50L, 0L, null, 0) ); assertThat(DBUtils.getAllFiles(pgContainer.getDataSource())) @@ -283,9 +283,9 @@ void inSequenceCommit() { assertThat(DBUtils.getAllLogs(pgContainer.getDataSource())) .containsExactlyInAnyOrder( - new LogsRecord(TOPIC_ID_0, 0, TOPIC_0, 0L, 0L, 0L), - new LogsRecord(TOPIC_ID_0, 1, TOPIC_0, 0L, 27L, 150L), - new LogsRecord(TOPIC_ID_1, 0, TOPIC_1, 0L, 0L, 0L) + new LogsRecord(TOPIC_ID_0, 0, TOPIC_0, 0L, 0L, 0L, 0L, null, 0), + new LogsRecord(TOPIC_ID_0, 1, TOPIC_0, 0L, 27L, 150L, 0L, null, 0), + new LogsRecord(TOPIC_ID_1, 0, TOPIC_1, 0L, 0L, 0L, 0L, null, 0) ); assertThat(DBUtils.getAllFiles(pgContainer.getDataSource())) @@ -325,9 +325,9 @@ void outOfOrderCommit(int lastBatchSequence, int firstBatchSequence) { assertThat(DBUtils.getAllLogs(pgContainer.getDataSource())) .containsExactlyInAnyOrder( - new LogsRecord(TOPIC_ID_0, 0, TOPIC_0, 0L, 0L, 0L), - new LogsRecord(TOPIC_ID_0, 1, TOPIC_0, 0L, 15L, 100L), - new LogsRecord(TOPIC_ID_1, 0, TOPIC_1, 0L, 0L, 0L) + new LogsRecord(TOPIC_ID_0, 0, TOPIC_0, 0L, 0L, 0L, 0L, null, 0), + new LogsRecord(TOPIC_ID_0, 1, TOPIC_0, 0L, 15L, 100L, 0L, null, 0), + new LogsRecord(TOPIC_ID_1, 0, TOPIC_1, 0L, 0L, 0L, 0L, null, 0) ); assertThat(DBUtils.getAllFiles(pgContainer.getDataSource())) @@ -357,9 +357,9 @@ void outOfOrderCommitNewEpoch() { assertThat(DBUtils.getAllLogs(pgContainer.getDataSource())) .containsExactlyInAnyOrder( - new LogsRecord(TOPIC_ID_0, 0, TOPIC_0, 0L, 0L, 0L), - new LogsRecord(TOPIC_ID_0, 1, TOPIC_0, 0L, 0L, 0L), - new LogsRecord(TOPIC_ID_1, 0, TOPIC_1, 0L, 0L, 0L) + new LogsRecord(TOPIC_ID_0, 0, TOPIC_0, 0L, 0L, 0L, 0L, null, 0), + new LogsRecord(TOPIC_ID_0, 1, TOPIC_0, 0L, 0L, 0L, 0L, null, 0), + new LogsRecord(TOPIC_ID_1, 0, TOPIC_1, 0L, 0L, 0L, 0L, null, 0) ); // The file will be deleted because its only batch is rejected. @@ -389,9 +389,9 @@ void invalidProducerEpoch() { assertThat(DBUtils.getAllLogs(pgContainer.getDataSource())) .containsExactlyInAnyOrder( - new LogsRecord(TOPIC_ID_0, 0, TOPIC_0, 0L, 0L, 0L), - new LogsRecord(TOPIC_ID_0, 1, TOPIC_0, 0L, 15L, 100L), - new LogsRecord(TOPIC_ID_1, 0, TOPIC_1, 0L, 0L, 0L) + new LogsRecord(TOPIC_ID_0, 0, TOPIC_0, 0L, 0L, 0L, 0L, null, 0), + new LogsRecord(TOPIC_ID_0, 1, TOPIC_0, 0L, 15L, 100L, 0L, null, 0), + new LogsRecord(TOPIC_ID_1, 0, TOPIC_1, 0L, 0L, 0L, 0L, null, 0) ); assertThat(DBUtils.getAllFiles(pgContainer.getDataSource())) diff --git a/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/DBUtils.java b/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/DBUtils.java index 4b532737b5..770cbd7f6c 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/DBUtils.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/DBUtils.java @@ -18,6 +18,8 @@ package io.aiven.inkless.control_plane.postgres; +import org.apache.kafka.common.Uuid; + import com.zaxxer.hikari.HikariDataSource; import org.jooq.DSLContext; @@ -26,17 +28,20 @@ import org.jooq.generated.tables.records.BatchesRecord; import org.jooq.generated.tables.records.FilesRecord; import org.jooq.generated.tables.records.LogsRecord; +import org.jooq.generated.tables.records.ProducerStateRecord; import org.jooq.impl.DSL; import org.jooq.impl.TableImpl; import java.sql.Connection; import java.sql.SQLException; +import java.util.List; import java.util.Set; import java.util.stream.Collectors; import static org.jooq.generated.Tables.BATCHES; import static org.jooq.generated.Tables.FILES; import static org.jooq.generated.Tables.LOGS; +import static org.jooq.generated.Tables.PRODUCER_STATE; import static org.jooq.impl.DSL.asterisk; public class DBUtils { @@ -52,6 +57,52 @@ static Set getAllBatches(final HikariDataSource hikariDataSource) return getAll(hikariDataSource, BATCHES, BatchesRecord.class); } + static List getAllProducerState(final HikariDataSource hikariDataSource) { + try (final Connection connection = hikariDataSource.getConnection()) { + final DSLContext ctx = DSL.using(connection, SQLDialect.POSTGRES); + return ctx.select(asterisk()) + .from(PRODUCER_STATE) + .orderBy(PRODUCER_STATE.TOPIC_ID, PRODUCER_STATE.PARTITION, PRODUCER_STATE.PRODUCER_ID, PRODUCER_STATE.ROW_ID) + .fetchInto(ProducerStateRecord.class); + } catch (final SQLException e) { + throw new RuntimeException(e); + } + } + + static Integer getLeaderEpochAtInit(final HikariDataSource hikariDataSource, final Uuid topicId, final int partition) { + try (final Connection connection = hikariDataSource.getConnection()) { + final DSLContext ctx = DSL.using(connection, SQLDialect.POSTGRES); + return ctx.select(LOGS.LEADER_EPOCH_AT_INIT) + .from(LOGS) + .where(LOGS.TOPIC_ID.eq(topicId) + .and(LOGS.PARTITION.eq(partition))) + .fetchOne(LOGS.LEADER_EPOCH_AT_INIT); + } catch (final SQLException e) { + throw new RuntimeException(e); + } + } + + /** + * Simulates messages being appended by updating the high_watermark. + * This makes diskless_start_offset < high_watermark, indicating the log is no longer in migration phase. + */ + static void simulateMessagesAppended(final HikariDataSource hikariDataSource, final Uuid topicId, final int partition, final long newHighWatermark) { + try (final Connection connection = hikariDataSource.getConnection()) { + final DSLContext ctx = DSL.using(connection, SQLDialect.POSTGRES); + final int rowsUpdated = ctx.update(LOGS) + .set(LOGS.HIGH_WATERMARK, newHighWatermark) + .where(LOGS.TOPIC_ID.eq(topicId) + .and(LOGS.PARTITION.eq(partition))) + .execute(); + if (rowsUpdated == 0) { + throw new RuntimeException("No rows updated - log entry not found for topic " + topicId + " partition " + partition); + } + connection.commit(); + } catch (final SQLException e) { + throw new RuntimeException(e); + } + } + private static Set getAll(final HikariDataSource hikariDataSource, final TableImpl table, final Class recordClass) { diff --git a/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/DeleteRecordsJobTest.java b/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/DeleteRecordsJobTest.java index b6eb541544..2b960fe1d4 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/DeleteRecordsJobTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/DeleteRecordsJobTest.java @@ -185,10 +185,10 @@ void deleteRecordsFromMultipleTopics(final List order) { assertThat(responses).containsExactlyElementsOf(expectedResponses); assertThat(DBUtils.getAllLogs(pgContainer.getDataSource())).containsExactlyInAnyOrder( - new LogsRecord(TOPIC_ID_0, 0, TOPIC_0, 18L, 36L, (long) file2Batch1Size + file3Batch1Size), - new LogsRecord(TOPIC_ID_0, 1, TOPIC_0, 24L, 24L, 0L), - new LogsRecord(TOPIC_ID_1, 0, TOPIC_1, 0L, 0L, 0L), - new LogsRecord(TOPIC_ID_2, 0, TOPIC_2, 0L, 24L, (long) file2Batch2Size + file3Batch3Size) + new LogsRecord(TOPIC_ID_0, 0, TOPIC_0, 18L, 36L, (long) file2Batch1Size + file3Batch1Size, 0L, null, 0), + new LogsRecord(TOPIC_ID_0, 1, TOPIC_0, 24L, 24L, 0L, 0L, null, 0), + new LogsRecord(TOPIC_ID_1, 0, TOPIC_1, 0L, 0L, 0L, 0L, null, 0), + new LogsRecord(TOPIC_ID_2, 0, TOPIC_2, 0L, 24L, (long) file2Batch2Size + file3Batch3Size, 0L, null, 0) ); assertThat(DBUtils.getAllBatches(pgContainer.getDataSource())).containsExactlyInAnyOrder( diff --git a/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/DeleteTopicJobTest.java b/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/DeleteTopicJobTest.java index 522d8d6592..355ddfbf55 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/DeleteTopicJobTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/DeleteTopicJobTest.java @@ -149,7 +149,7 @@ void deleteMultipleTopics() { // The logs of the deleted topics must be gone, i.e. only TOPIC_2 remains. assertThat(DBUtils.getAllLogs(pgContainer.getDataSource())).containsExactly( - new LogsRecord(TOPIC_ID_2, 0, TOPIC_2, 0L, 24L, (long) file2Batch2Size + file3Batch3Size) + new LogsRecord(TOPIC_ID_2, 0, TOPIC_2, 0L, 24L, (long) file2Batch2Size + file3Batch3Size, 0L, null, 0) ); // The batches of the deleted topics must be gone, i.e. only TOPIC_2 remains. diff --git a/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/GetDisklessLogJobTest.java b/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/GetDisklessLogJobTest.java new file mode 100644 index 0000000000..941a5586b3 --- /dev/null +++ b/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/GetDisklessLogJobTest.java @@ -0,0 +1,203 @@ +/* + * Inkless + * Copyright (C) 2026 Aiven OY + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package io.aiven.inkless.control_plane.postgres; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.utils.Time; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.util.List; +import java.util.Set; + +import io.aiven.inkless.control_plane.GetDisklessLogRequest; +import io.aiven.inkless.control_plane.GetDisklessLogResponse; +import io.aiven.inkless.control_plane.InitDisklessLogRequest; +import io.aiven.inkless.test_utils.InklessPostgreSQLContainer; +import io.aiven.inkless.test_utils.PostgreSQLTestContainer; + +import static org.assertj.core.api.Assertions.assertThat; + +@Testcontainers +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.STRICT_STUBS) +class GetDisklessLogJobTest { + @Container + static final InklessPostgreSQLContainer pgContainer = PostgreSQLTestContainer.container(); + + static final String TOPIC_1 = "topic1"; + static final Uuid TOPIC_ID1 = new Uuid(10, 12); + static final String TOPIC_2 = "topic2"; + static final Uuid TOPIC_ID2 = new Uuid(20, 24); + + @BeforeEach + void setUp(final TestInfo testInfo) { + pgContainer.createDatabase(testInfo); + pgContainer.migrate(); + } + + @AfterEach + void tearDown() { + pgContainer.tearDown(); + } + + @Test + void emptyRequestList() throws Exception { + final GetDisklessLogJob job = new GetDisklessLogJob( + Time.SYSTEM, pgContainer.getJooqCtx(), List.of(), durationMs -> {}); + final List responses = job.call(); + assertThat(responses).isEmpty(); + } + + @Test + void unknownTopicOrPartition() throws Exception { + // Query for a partition that doesn't exist + final List requests = List.of( + new GetDisklessLogRequest(TOPIC_ID1, 0) + ); + final GetDisklessLogJob job = new GetDisklessLogJob( + Time.SYSTEM, pgContainer.getJooqCtx(), requests, durationMs -> {}); + final List responses = job.call(); + + assertThat(responses).hasSize(1); + final GetDisklessLogResponse response = responses.get(0); + assertThat(response.topicId()).isEqualTo(TOPIC_ID1); + assertThat(response.partition()).isEqualTo(0); + assertThat(response.error()).isEqualTo(Errors.UNKNOWN_TOPIC_OR_PARTITION); + assertThat(response.logStartOffset()).isEqualTo(GetDisklessLogResponse.INVALID_OFFSET); + assertThat(response.highWatermark()).isEqualTo(GetDisklessLogResponse.INVALID_OFFSET); + assertThat(response.disklessStartOffset()).isEqualTo(GetDisklessLogResponse.INVALID_OFFSET); + } + + @Test + void getExistingLog() throws Exception { + // First, create a log entry + final Set initRequests = Set.of( + new InitDisklessLogRequest(TOPIC_ID1, TOPIC_1, 0, 100L, 200L, 5, List.of()) + ); + new InitDisklessLogJob(Time.SYSTEM, pgContainer.getJooqCtx(), initRequests, durationMs -> {}).call(); + + // Now query for it + final List requests = List.of( + new GetDisklessLogRequest(TOPIC_ID1, 0) + ); + final GetDisklessLogJob job = new GetDisklessLogJob( + Time.SYSTEM, pgContainer.getJooqCtx(), requests, durationMs -> {}); + final List responses = job.call(); + + assertThat(responses).hasSize(1); + final GetDisklessLogResponse response = responses.get(0); + assertThat(response.topicId()).isEqualTo(TOPIC_ID1); + assertThat(response.partition()).isEqualTo(0); + assertThat(response.error()).isEqualTo(Errors.NONE); + assertThat(response.logStartOffset()).isEqualTo(100L); + assertThat(response.highWatermark()).isEqualTo(200L); + // disklessStartOffset is set to highWatermark when initialized + assertThat(response.disklessStartOffset()).isEqualTo(200L); + } + + @Test + void getMultiplePartitions() throws Exception { + // Create multiple log entries + final Set initRequests = Set.of( + new InitDisklessLogRequest(TOPIC_ID1, TOPIC_1, 0, 100L, 200L, 5, List.of()), + new InitDisklessLogRequest(TOPIC_ID1, TOPIC_1, 1, 50L, 150L, 3, List.of()), + new InitDisklessLogRequest(TOPIC_ID2, TOPIC_2, 0, 0L, 500L, 1, List.of()) + ); + new InitDisklessLogJob(Time.SYSTEM, pgContainer.getJooqCtx(), initRequests, durationMs -> {}).call(); + + // Query for all of them + final List requests = List.of( + new GetDisklessLogRequest(TOPIC_ID1, 0), + new GetDisklessLogRequest(TOPIC_ID1, 1), + new GetDisklessLogRequest(TOPIC_ID2, 0) + ); + final GetDisklessLogJob job = new GetDisklessLogJob( + Time.SYSTEM, pgContainer.getJooqCtx(), requests, durationMs -> {}); + final List responses = job.call(); + + assertThat(responses).hasSize(3); + + // Check topic1 partition 0 + final GetDisklessLogResponse topic1Part0 = responses.stream() + .filter(r -> r.topicId().equals(TOPIC_ID1) && r.partition() == 0) + .findFirst() + .orElseThrow(); + assertThat(topic1Part0.error()).isEqualTo(Errors.NONE); + assertThat(topic1Part0.logStartOffset()).isEqualTo(100L); + assertThat(topic1Part0.highWatermark()).isEqualTo(200L); + assertThat(topic1Part0.disklessStartOffset()).isEqualTo(200L); + + // Check topic1 partition 1 + final GetDisklessLogResponse topic1Part1 = responses.stream() + .filter(r -> r.topicId().equals(TOPIC_ID1) && r.partition() == 1) + .findFirst() + .orElseThrow(); + assertThat(topic1Part1.error()).isEqualTo(Errors.NONE); + assertThat(topic1Part1.logStartOffset()).isEqualTo(50L); + assertThat(topic1Part1.highWatermark()).isEqualTo(150L); + assertThat(topic1Part1.disklessStartOffset()).isEqualTo(150L); + + // Check topic2 partition 0 + final GetDisklessLogResponse topic2Part0 = responses.stream() + .filter(r -> r.topicId().equals(TOPIC_ID2) && r.partition() == 0) + .findFirst() + .orElseThrow(); + assertThat(topic2Part0.error()).isEqualTo(Errors.NONE); + assertThat(topic2Part0.logStartOffset()).isEqualTo(0L); + assertThat(topic2Part0.highWatermark()).isEqualTo(500L); + assertThat(topic2Part0.disklessStartOffset()).isEqualTo(500L); + } + + @Test + void duplicateRequestsReturnDuplicateResponses() throws Exception { + // Create a log entry + final Set initRequests = Set.of( + new InitDisklessLogRequest(TOPIC_ID1, TOPIC_1, 0, 100L, 200L, 5, List.of()) + ); + new InitDisklessLogJob(Time.SYSTEM, pgContainer.getJooqCtx(), initRequests, durationMs -> {}).call(); + + // Query for the same partition twice + final List requests = List.of( + new GetDisklessLogRequest(TOPIC_ID1, 0), + new GetDisklessLogRequest(TOPIC_ID1, 0) + ); + final GetDisklessLogJob job = new GetDisklessLogJob( + Time.SYSTEM, pgContainer.getJooqCtx(), requests, durationMs -> {}); + final List responses = job.call(); + + assertThat(responses).hasSize(2); + for (final GetDisklessLogResponse response : responses) { + assertThat(response.topicId()).isEqualTo(TOPIC_ID1); + assertThat(response.partition()).isEqualTo(0); + assertThat(response.error()).isEqualTo(Errors.NONE); + assertThat(response.logStartOffset()).isEqualTo(100L); + assertThat(response.highWatermark()).isEqualTo(200L); + } + } +} diff --git a/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/InitDisklessLogJobTest.java b/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/InitDisklessLogJobTest.java new file mode 100644 index 0000000000..e4b4eb378c --- /dev/null +++ b/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/InitDisklessLogJobTest.java @@ -0,0 +1,229 @@ +/* + * Inkless + * Copyright (C) 2024 - 2025 Aiven OY + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package io.aiven.inkless.control_plane.postgres; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.utils.Time; + +import org.jooq.generated.tables.records.ProducerStateRecord; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.util.List; +import java.util.Set; + +import io.aiven.inkless.control_plane.DisklessLogAlreadyInitializedException; +import io.aiven.inkless.control_plane.InitDisklessLogRequest; +import io.aiven.inkless.control_plane.ProducerStateSnapshot; +import io.aiven.inkless.control_plane.StaleLeaderEpochException; +import io.aiven.inkless.test_utils.InklessPostgreSQLContainer; +import io.aiven.inkless.test_utils.PostgreSQLTestContainer; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +@Testcontainers +class InitDisklessLogJobTest { + @Container + static final InklessPostgreSQLContainer pgContainer = PostgreSQLTestContainer.container(); + + static final String TOPIC_1 = "topic1"; + static final Uuid TOPIC_ID1 = new Uuid(10, 12); + + @BeforeEach + void setUp(final TestInfo testInfo) { + pgContainer.createDatabase(testInfo); + pgContainer.migrate(); + } + + @AfterEach + void tearDown() { + pgContainer.tearDown(); + } + + private void runInitJob(final Set requests) { + new InitDisklessLogJob(Time.SYSTEM, pgContainer.getJooqCtx(), requests, durationMs -> {}).call(); + } + + @Test + void initializeNewLog() { + final List producerStateEntries = List.of( + new ProducerStateSnapshot(1001L, (short) 1, 0, 9, 100L, 1000L), + new ProducerStateSnapshot(1002L, (short) 2, 0, 4, 110L, 2000L) + ); + runInitJob(Set.of( + new InitDisklessLogRequest(TOPIC_ID1, TOPIC_1, 0, 100L, 200L, 5, producerStateEntries) + )); + + // Verify log was created + final var logs = DBUtils.getAllLogs(pgContainer.getDataSource()); + assertThat(logs).hasSize(1); + final var log = logs.iterator().next(); + assertThat(log.getTopicId()).isEqualTo(TOPIC_ID1); + assertThat(log.getPartition()).isEqualTo(0); + assertThat(log.getTopicName()).isEqualTo(TOPIC_1); + assertThat(log.getLogStartOffset()).isEqualTo(100L); + assertThat(log.getHighWatermark()).isEqualTo(200L); + assertThat(log.getDisklessStartOffset()).isEqualTo(200L); + assertThat(DBUtils.getLeaderEpochAtInit(pgContainer.getDataSource(), TOPIC_ID1, 0)).isEqualTo(5); + + // Verify producer state was created + final var producerStates = DBUtils.getAllProducerState(pgContainer.getDataSource()); + assertThat(producerStates).hasSize(2); + + final ProducerStateRecord state1 = producerStates.stream() + .filter(s -> s.getProducerId() == 1001L) + .findFirst() + .orElseThrow(); + assertThat(state1.getProducerEpoch()).isEqualTo((short) 1); + assertThat(state1.getBaseSequence()).isEqualTo(0); + assertThat(state1.getLastSequence()).isEqualTo(9); + assertThat(state1.getAssignedOffset()).isEqualTo(100L); + assertThat(state1.getBatchMaxTimestamp()).isEqualTo(1000L); + + final ProducerStateRecord state2 = producerStates.stream() + .filter(s -> s.getProducerId() == 1002L) + .findFirst() + .orElseThrow(); + assertThat(state2.getProducerEpoch()).isEqualTo((short) 2); + assertThat(state2.getBaseSequence()).isEqualTo(0); + assertThat(state2.getLastSequence()).isEqualTo(4); + assertThat(state2.getAssignedOffset()).isEqualTo(110L); + assertThat(state2.getBatchMaxTimestamp()).isEqualTo(2000L); + } + + @Test + void throwsExceptionWhenStaleLeaderEpoch() { + // First initialization with leader epoch 5 + runInitJob(Set.of(new InitDisklessLogRequest(TOPIC_ID1, TOPIC_1, 0, 100L, 200L, 5, List.of()))); + + // Second initialization attempt with lower epoch (should throw StaleLeaderEpochException) + final var secondRequests = Set.of(new InitDisklessLogRequest(TOPIC_ID1, TOPIC_1, 0, 300L, 400L, 3, List.of())); + assertThatThrownBy(() -> runInitJob(secondRequests)) + .isInstanceOf(StaleLeaderEpochException.class) + .satisfies(e -> { + final StaleLeaderEpochException ex = (StaleLeaderEpochException) e; + assertThat(ex.topicId()).isEqualTo(TOPIC_ID1); + assertThat(ex.partition()).isEqualTo(0); + assertThat(ex.requestedEpoch()).isEqualTo(3); + }); + + // Verify the log still has the original values + final var logs = DBUtils.getAllLogs(pgContainer.getDataSource()); + assertThat(logs).hasSize(1); + final var log = logs.iterator().next(); + assertThat(log.getLogStartOffset()).isEqualTo(100L); + assertThat(log.getHighWatermark()).isEqualTo(200L); + assertThat(DBUtils.getLeaderEpochAtInit(pgContainer.getDataSource(), TOPIC_ID1, 0)).isEqualTo(5); + } + + @Test + void throwsExceptionWhenMessagesAppended() { + // First initialization with leader epoch 5 + final var producerState = List.of(new ProducerStateSnapshot(1001L, (short) 1, 0, 9, 100L, 1000L)); + runInitJob(Set.of(new InitDisklessLogRequest(TOPIC_ID1, TOPIC_1, 0, 100L, 200L, 5, producerState))); + + // Simulate messages being appended (high_watermark moves past diskless_start_offset) + DBUtils.simulateMessagesAppended(pgContainer.getDataSource(), TOPIC_ID1, 0, 300L); + + // Second initialization with higher leader epoch should fail because messages have been appended + final var secondRequests = Set.of(new InitDisklessLogRequest(TOPIC_ID1, TOPIC_1, 0, 300L, 400L, 7, List.of())); + assertThatThrownBy(() -> runInitJob(secondRequests)) + .isInstanceOf(DisklessLogAlreadyInitializedException.class) + .satisfies(e -> { + final var ex = (DisklessLogAlreadyInitializedException) e; + assertThat(ex.topicId()).isEqualTo(TOPIC_ID1); + assertThat(ex.partition()).isEqualTo(0); + }); + + // Verify the log still has the original values (except high_watermark which was updated by simulation) + final var log = DBUtils.getAllLogs(pgContainer.getDataSource()).iterator().next(); + assertThat(log.getLogStartOffset()).isEqualTo(100L); + assertThat(log.getHighWatermark()).isEqualTo(300L); + assertThat(log.getDisklessStartOffset()).isEqualTo(200L); + assertThat(DBUtils.getLeaderEpochAtInit(pgContainer.getDataSource(), TOPIC_ID1, 0)).isEqualTo(5); + + // Verify producer state was not changed + assertThat(DBUtils.getAllProducerState(pgContainer.getDataSource())) + .hasSize(1) + .extracting(ProducerStateRecord::getProducerId) + .containsExactly(1001L); + } + + @Test + void updateLogInMigrationPhase() { + // First initialization with leader epoch 5 + final var producerState = List.of(new ProducerStateSnapshot(1001L, (short) 1, 0, 9, 100L, 1000L)); + runInitJob(Set.of(new InitDisklessLogRequest(TOPIC_ID1, TOPIC_1, 0, 100L, 200L, 5, producerState))); + assertThat(DBUtils.getAllProducerState(pgContainer.getDataSource())).hasSize(1); + + // Second initialization with equal leader epoch while still in migration phase - should succeed + runInitJob(Set.of(new InitDisklessLogRequest(TOPIC_ID1, TOPIC_1, 0, 150L, 250L, 5, List.of()))); + + // Verify the log was updated and producer state was cleared + var log = DBUtils.getAllLogs(pgContainer.getDataSource()).iterator().next(); + assertThat(log.getLogStartOffset()).isEqualTo(150L); + assertThat(log.getHighWatermark()).isEqualTo(250L); + assertThat(log.getDisklessStartOffset()).isEqualTo(250L); + assertThat(DBUtils.getLeaderEpochAtInit(pgContainer.getDataSource(), TOPIC_ID1, 0)).isEqualTo(5); + assertThat(DBUtils.getAllProducerState(pgContainer.getDataSource())).isEmpty(); + + // Third initialization with higher leader epoch while still in migration phase - should succeed + final var newProducerState = List.of( + new ProducerStateSnapshot(2001L, (short) 3, 0, 14, 300L, 3000L), + new ProducerStateSnapshot(2002L, (short) 1, 5, 10, 320L, 3200L) + ); + runInitJob(Set.of(new InitDisklessLogRequest(TOPIC_ID1, TOPIC_1, 0, 300L, 400L, 7, newProducerState))); + + // Verify the log was updated + log = DBUtils.getAllLogs(pgContainer.getDataSource()).iterator().next(); + assertThat(log.getLogStartOffset()).isEqualTo(300L); + assertThat(log.getHighWatermark()).isEqualTo(400L); + assertThat(log.getDisklessStartOffset()).isEqualTo(400L); + assertThat(DBUtils.getLeaderEpochAtInit(pgContainer.getDataSource(), TOPIC_ID1, 0)).isEqualTo(7); + + // Verify producer state was inserted + assertThat(DBUtils.getAllProducerState(pgContainer.getDataSource())) + .hasSize(2) + .extracting(ProducerStateRecord::getProducerId) + .containsExactlyInAnyOrder(2001L, 2002L); + } + + @Test + void multiplePartitionsCanBeInitialized() { + runInitJob(Set.of( + new InitDisklessLogRequest(TOPIC_ID1, TOPIC_1, 0, 100L, 200L, 5, List.of()), + new InitDisklessLogRequest(TOPIC_ID1, TOPIC_1, 1, 50L, 100L, 3, List.of()) + )); + + final var logs = DBUtils.getAllLogs(pgContainer.getDataSource()); + assertThat(logs).hasSize(2); + + final var partition0 = logs.stream().filter(l -> l.getPartition() == 0).findFirst().orElseThrow(); + assertThat(partition0.getLogStartOffset()).isEqualTo(100L); + assertThat(partition0.getHighWatermark()).isEqualTo(200L); + + final var partition1 = logs.stream().filter(l -> l.getPartition() == 1).findFirst().orElseThrow(); + assertThat(partition1.getLogStartOffset()).isEqualTo(50L); + assertThat(partition1.getHighWatermark()).isEqualTo(100L); + } +} diff --git a/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/TopicsAndPartitionsCreateJobTest.java b/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/TopicsAndPartitionsCreateJobTest.java index 7e818812c9..6a98ccbc9f 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/TopicsAndPartitionsCreateJobTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/TopicsAndPartitionsCreateJobTest.java @@ -96,18 +96,18 @@ void createTopicsAndPartition() { final TopicsAndPartitionsCreateJob job1 = new TopicsAndPartitionsCreateJob(Time.SYSTEM, pgContainer.getJooqCtx(), createTopicAndPartitionsRequests, durationMs -> {}); job1.run(); assertThat(DBUtils.getAllLogs(pgContainer.getDataSource())).containsExactlyInAnyOrder( - new LogsRecord(TOPIC_ID1, 0, TOPIC_1, 0L, 0L, 0L), - new LogsRecord(TOPIC_ID1, 1, TOPIC_1, 0L, 0L, 0L), - new LogsRecord(TOPIC_ID2, 0, TOPIC_2, 0L, 0L, 0L) + new LogsRecord(TOPIC_ID1, 0, TOPIC_1, 0L, 0L, 0L, 0L, null, 0), + new LogsRecord(TOPIC_ID1, 1, TOPIC_1, 0L, 0L, 0L, 0L, null, 0), + new LogsRecord(TOPIC_ID2, 0, TOPIC_2, 0L, 0L, 0L, 0L, null, 0) ); // Repetition doesn't affect anything. final TopicsAndPartitionsCreateJob job2 = new TopicsAndPartitionsCreateJob(Time.SYSTEM, pgContainer.getJooqCtx(), createTopicAndPartitionsRequests, durationMs -> {}); job2.run(); assertThat(DBUtils.getAllLogs(pgContainer.getDataSource())).containsExactlyInAnyOrder( - new LogsRecord(TOPIC_ID1, 0, TOPIC_1, 0L, 0L, 0L), - new LogsRecord(TOPIC_ID1, 1, TOPIC_1, 0L, 0L, 0L), - new LogsRecord(TOPIC_ID2, 0, TOPIC_2, 0L, 0L, 0L) + new LogsRecord(TOPIC_ID1, 0, TOPIC_1, 0L, 0L, 0L, 0L, null, 0), + new LogsRecord(TOPIC_ID1, 1, TOPIC_1, 0L, 0L, 0L, 0L, null, 0), + new LogsRecord(TOPIC_ID2, 0, TOPIC_2, 0L, 0L, 0L, 0L, null, 0) ); } @@ -128,10 +128,10 @@ void createPartitionAfterTopic() { job2.run(); assertThat(DBUtils.getAllLogs(pgContainer.getDataSource())).containsExactlyInAnyOrder( - new LogsRecord(TOPIC_ID1, 0, TOPIC_1, 0L, 0L, 0L), - new LogsRecord(TOPIC_ID1, 1, TOPIC_1, 0L, 0L, 0L), - new LogsRecord(TOPIC_ID2, 0, TOPIC_2, 0L, 0L, 0L), - new LogsRecord(TOPIC_ID2, 1, TOPIC_2, 0L, 0L, 0L) + new LogsRecord(TOPIC_ID1, 0, TOPIC_1, 0L, 0L, 0L, 0L, null, 0), + new LogsRecord(TOPIC_ID1, 1, TOPIC_1, 0L, 0L, 0L, 0L, null, 0), + new LogsRecord(TOPIC_ID2, 0, TOPIC_2, 0L, 0L, 0L, 0L, null, 0), + new LogsRecord(TOPIC_ID2, 1, TOPIC_2, 0L, 0L, 0L, 0L, null, 0) ); } @@ -157,9 +157,9 @@ void existingRecordsNotAffected() throws SQLException { job1.run(); assertThat(DBUtils.getAllLogs(pgContainer.getDataSource())).containsExactlyInAnyOrder( - new LogsRecord(TOPIC_ID1, 0, TOPIC_1, 101L, 201L, 999L), // unaffected - new LogsRecord(TOPIC_ID1, 1, TOPIC_1, 0L, 0L, 0L), - new LogsRecord(TOPIC_ID2, 0, TOPIC_2, 102L, 202L, 1999L) // unaffected + new LogsRecord(TOPIC_ID1, 0, TOPIC_1, 101L, 201L, 999L, 0L, null, 0), // unaffected + new LogsRecord(TOPIC_ID1, 1, TOPIC_1, 0L, 0L, 0L, 0L, null, 0), + new LogsRecord(TOPIC_ID2, 0, TOPIC_2, 102L, 202L, 1999L, 0L, null, 0) // unaffected ); } }