Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2863,6 +2863,7 @@ class ReplicaManager(val config: KafkaConfig,
inklessFetchOffsetHandler.foreach(_.close())
inklessRetentionEnforcer.foreach(_.close())
inklessFileCleaner.foreach(_.close())
inklessDeleteRecordsInterceptor.foreach(_.close())
inklessSharedState.foreach(_.close())
info("Shut down completely")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.message.ListOffsetsRequestData;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.metadata.LeaderAndIsr;
import org.apache.kafka.storage.internals.log.OffsetResultHolder;
Expand All @@ -41,6 +42,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import io.aiven.inkless.TimeUtils;
Expand All @@ -59,10 +61,25 @@ public class FetchOffsetHandler implements Closeable {
private final InklessFetchOffsetMetrics metrics;

public FetchOffsetHandler(SharedState state) {
this(
state,
Executors.newCachedThreadPool(new InklessThreadFactory("inkless-fetch-offset-metadata", false)),
state.time(),
new InklessFetchOffsetMetrics(state.time())
);
}

// Visible for testing
FetchOffsetHandler(
final SharedState state,
final ExecutorService executor,
final Time time,
final InklessFetchOffsetMetrics metrics
) {
this.state = state;
this.executor = Executors.newCachedThreadPool(new InklessThreadFactory("inkless-fetch-offset-metadata", false));
this.time = state.time();
this.metrics = new InklessFetchOffsetMetrics(state.time());
this.executor = executor;
this.time = time;
this.metrics = metrics;
}

public Job createJob() {
Expand All @@ -71,7 +88,7 @@ public Job createJob() {

@Override
public void close() throws IOException {
executor.shutdown();
ThreadUtils.shutdownExecutorServiceQuietly(executor, 5, TimeUnit.SECONDS);
metrics.close();
}

Expand Down
12 changes: 12 additions & 0 deletions storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,18 @@
import io.github.bucket4j.Bandwidth;
import io.github.bucket4j.Bucket;

/**
* Reader for fetching data from Inkless storage.
*
* <h2>Thread Pool Lifecycle</h2>
* <p>This class manages thread pools for metadata fetching, data fetching, and optionally
* for lagging consumer requests. All pools must be shut down via {@link #close()}.
*
* <p><b>Design Note:</b> Thread pools are created in the constructor arguments before delegation.
* If construction fails after pool creation (e.g., invalid lagging consumer configuration),
* the pools may leak. This is acceptable for broker startup components where failure prevents
* broker startup and JVM exit cleans up resources.
*/
public class Reader implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(Reader.class);
private static final long EXECUTOR_SHUTDOWN_TIMEOUT_SECONDS = 5;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,23 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.DeleteRecordsResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.ThreadUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import io.aiven.inkless.common.InklessThreadFactory;
import io.aiven.inkless.common.SharedState;
import io.aiven.inkless.common.TopicIdEnricher;
import io.aiven.inkless.common.TopicTypeCounter;
Expand All @@ -43,22 +48,30 @@

import static org.apache.kafka.common.requests.DeleteRecordsResponse.INVALID_LOW_WATERMARK;

public class DeleteRecordsInterceptor {
public class DeleteRecordsInterceptor implements Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger(DeleteRecordsInterceptor.class);

private final ControlPlane controlPlane;
private final MetadataView metadataView;
private final Executor executor;
private final ExecutorService executorService;
private final TopicTypeCounter topicTypeCounter;

public DeleteRecordsInterceptor(final SharedState state) {
this(state.controlPlane(), state.metadata(), Executors.newCachedThreadPool());
this(
state.controlPlane(),
state.metadata(),
Executors.newCachedThreadPool(new InklessThreadFactory("inkless-delete-records", false))
);
}

// Visible for testing.
DeleteRecordsInterceptor(final ControlPlane controlPlane, final MetadataView metadataView, final Executor executor) {
DeleteRecordsInterceptor(
final ControlPlane controlPlane,
final MetadataView metadataView,
final ExecutorService executorService
) {
this.controlPlane = controlPlane;
this.executor = executor;
this.executorService = executorService;
this.metadataView = metadataView;
this.topicTypeCounter = new TopicTypeCounter(metadataView);
}
Expand Down Expand Up @@ -94,7 +107,7 @@ public boolean intercept(final Map<TopicPartition, Long> offsetPerPartition,
}

// TODO use purgatory
executor.execute(() -> {
executorService.execute(() -> {
try {
final List<DeleteRecordsRequest> requests = offsetPerPartitionEnriched.entrySet().stream()
.map(kv -> new DeleteRecordsRequest(kv.getKey(), kv.getValue()))
Expand Down Expand Up @@ -132,4 +145,9 @@ private void respondAllWithError(final Map<TopicPartition, Long> offsetPerPartit
));
responseCallback.accept(response);
}

@Override
public void close() throws IOException {
ThreadUtils.shutdownExecutorServiceQuietly(executorService, 5, TimeUnit.SECONDS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,22 @@
/**
* The file committer.
*
* <p>It uploads files concurrently, but commits them to the control plan sequentially.
* <p>It uploads files concurrently, but commits them to the control plane sequentially.
*
* <h2>Thread Pool Lifecycle</h2>
* <p>This class manages three thread pools for upload, commit, and cache storage operations.
* The pools are created during construction and must be shut down via {@link #close()}.
*
* <p><b>Design Note:</b> Thread pools are created in the constructor arguments before delegation.
* If construction fails after pool creation (e.g., due to invalid arguments), the pools may leak.
* This is acceptable because:
* <ul>
* <li>This is a broker startup component - construction failure prevents broker startup</li>
* <li>The JVM would exit anyway, cleaning up all threads</li>
* <li>Failure scenarios are low-probability edge cases (null arguments, OOM)</li>
* </ul>
* <p>Arguments are validated early in the delegated constructor to fail-fast before
* any significant work is done with the thread pools.
*/
class FileCommitter implements Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger(FileCommitter.class);
Expand Down Expand Up @@ -247,6 +262,7 @@ public void close() throws IOException {
// Don't wait here, they should try to finish their work.
executorServiceUpload.shutdown();
executorServiceCommit.shutdown();
executorServiceCacheStore.shutdown();
metrics.close();
if (threadPoolMonitor != null) threadPoolMonitor.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,16 @@
* <p>The Writer has the active file, the queue of files being uploaded.
* It schedules commit ticks at the specified interval.
*
* <p>The class is thread-safe: all the event entry points are protected with the lock.</p>
* <p>The class is thread-safe: all the event entry points are protected with the lock.
*
* <h2>Thread Pool Lifecycle</h2>
* <p>This class creates a scheduled executor for commit ticks and delegates to {@link FileCommitter}
* which manages additional thread pools. All pools must be shut down via {@link #close()}.
*
* <p><b>Design Note:</b> Thread pools are created in the constructor arguments before delegation.
* If construction fails after pool creation, the pools may leak. This is acceptable for broker
* startup components where failure prevents broker startup and JVM exit cleans up resources.
* See {@link FileCommitter} for details.
*/
class Writer implements Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger(Writer.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,13 @@
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;

import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import io.aiven.inkless.control_plane.ControlPlane;
import io.aiven.inkless.control_plane.ListOffsetsRequest;
Expand Down Expand Up @@ -201,4 +203,17 @@ void cancellation(final boolean cancelBeforeStart) {

verify(submittedFuture).cancel(eq(true));
}

@Test
void closeShutdownsExecutorService() throws IOException, InterruptedException {
when(executor.awaitTermination(5, TimeUnit.SECONDS)).thenReturn(true);

final InklessFetchOffsetMetrics metricsToClose = new InklessFetchOffsetMetrics(time);
final FetchOffsetHandler handler = new FetchOffsetHandler(null, executor, time, metricsToClose);

handler.close();

verify(executor).shutdown();
verify(executor).awaitTermination(5, TimeUnit.SECONDS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;

import io.aiven.inkless.control_plane.ControlPlane;
Expand Down Expand Up @@ -61,14 +63,16 @@ class DeleteRecordsInterceptorTest {
ControlPlane controlPlane;
@Mock
Consumer<Map<TopicPartition, DeleteRecordsResponseData.DeleteRecordsPartitionResult>> responseCallback;
@Mock
ExecutorService executorService;

@Captor
ArgumentCaptor<Map<TopicPartition, DeleteRecordsResponseData.DeleteRecordsPartitionResult>> resultCaptor;
@Captor
ArgumentCaptor<List<DeleteRecordsRequest>> deleteRecordsCaptor;

@Test
public void mixingDisklessAndClassicTopicsIsNotAllowed() {
public void mixingDisklessAndClassicTopicsIsNotAllowed() throws Exception {
when(metadataView.isDisklessTopic(eq("diskless"))).thenReturn(true);
when(metadataView.isDisklessTopic(eq("non_diskless"))).thenReturn(false);

Expand Down Expand Up @@ -98,10 +102,11 @@ public void mixingDisklessAndClassicTopicsIsNotAllowed() {
.setLowWatermark(INVALID_LOW_WATERMARK)
));
verify(controlPlane, never()).deleteRecords(any());
interceptor.close();
}

@Test
public void notInterceptDeletingRecordsFromClassicTopics() {
public void notInterceptDeletingRecordsFromClassicTopics() throws Exception {
when(metadataView.isDisklessTopic(eq("non_diskless"))).thenReturn(false);

final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor(controlPlane, metadataView, new SynchronousExecutor());
Expand All @@ -114,10 +119,11 @@ public void notInterceptDeletingRecordsFromClassicTopics() {
assertThat(result).isFalse();
verify(responseCallback, never()).accept(any());
verify(controlPlane, never()).deleteRecords(any());
interceptor.close();
}

@Test
public void interceptDeletingRecordsFromDisklessTopics() {
public void interceptDeletingRecordsFromDisklessTopics() throws Exception {
final Uuid topicId = new Uuid(1, 2);
when(metadataView.isDisklessTopic(eq("diskless"))).thenReturn(true);
when(metadataView.getTopicId(eq("diskless"))).thenReturn(topicId);
Expand Down Expand Up @@ -170,10 +176,11 @@ public void interceptDeletingRecordsFromDisklessTopics() {
.setErrorCode(Errors.KAFKA_STORAGE_ERROR.code())
.setLowWatermark(INVALID_LOW_WATERMARK)
));
interceptor.close();
}

@Test
public void controlPlaneException() {
public void controlPlaneException() throws Exception {
final Uuid topicId = new Uuid(1, 2);
when(metadataView.isDisklessTopic(eq("diskless"))).thenReturn(true);
when(metadataView.getTopicId(eq("diskless"))).thenReturn(topicId);
Expand Down Expand Up @@ -201,10 +208,11 @@ public void controlPlaneException() {
.setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
.setLowWatermark(INVALID_LOW_WATERMARK)
));
interceptor.close();
}

@Test
public void topicIdNotFound() {
public void topicIdNotFound() throws Exception {
when(metadataView.isDisklessTopic(eq("diskless1"))).thenReturn(true);
when(metadataView.isDisklessTopic(eq("diskless2"))).thenReturn(true);
// This instead of the normal thenReturn to not depend on the map key iteration order
Expand Down Expand Up @@ -244,5 +252,18 @@ public void topicIdNotFound() {
.setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
.setLowWatermark(INVALID_LOW_WATERMARK)
));
interceptor.close();
}

@Test
public void closeShutdownsExecutorService() throws IOException, InterruptedException {
when(executorService.awaitTermination(5, java.util.concurrent.TimeUnit.SECONDS)).thenReturn(true);

final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor(controlPlane, metadataView, executorService);

interceptor.close();

verify(executorService).shutdown();
verify(executorService).awaitTermination(5, java.util.concurrent.TimeUnit.SECONDS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ void close() throws IOException {

verify(executorServiceUpload).shutdown();
verify(executorServiceCommit).shutdown();
verify(executorServiceCacheStore).shutdown();
verify(metrics).close();
}

Expand Down
Loading