diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 1c793038a2..71aad75a07 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -2863,6 +2863,7 @@ class ReplicaManager(val config: KafkaConfig, inklessFetchOffsetHandler.foreach(_.close()) inklessRetentionEnforcer.foreach(_.close()) inklessFileCleaner.foreach(_.close()) + inklessDeleteRecordsInterceptor.foreach(_.close()) inklessSharedState.foreach(_.close()) info("Shut down completely") } diff --git a/storage/inkless/src/main/java/io/aiven/inkless/consume/FetchOffsetHandler.java b/storage/inkless/src/main/java/io/aiven/inkless/consume/FetchOffsetHandler.java index e106bc2ae2..ff7f3a9eb3 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/consume/FetchOffsetHandler.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/consume/FetchOffsetHandler.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.message.ListOffsetsRequestData; import org.apache.kafka.common.record.FileRecords; +import org.apache.kafka.common.utils.ThreadUtils; import org.apache.kafka.common.utils.Time; import org.apache.kafka.metadata.LeaderAndIsr; import org.apache.kafka.storage.internals.log.OffsetResultHolder; @@ -41,6 +42,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import io.aiven.inkless.TimeUtils; @@ -59,10 +61,25 @@ public class FetchOffsetHandler implements Closeable { private final InklessFetchOffsetMetrics metrics; public FetchOffsetHandler(SharedState state) { + this( + state, + Executors.newCachedThreadPool(new InklessThreadFactory("inkless-fetch-offset-metadata", false)), + state.time(), + new InklessFetchOffsetMetrics(state.time()) + ); + } + + // Visible for testing + FetchOffsetHandler( + final SharedState state, + final ExecutorService executor, + final Time time, + final InklessFetchOffsetMetrics metrics + ) { this.state = state; - this.executor = Executors.newCachedThreadPool(new InklessThreadFactory("inkless-fetch-offset-metadata", false)); - this.time = state.time(); - this.metrics = new InklessFetchOffsetMetrics(state.time()); + this.executor = executor; + this.time = time; + this.metrics = metrics; } public Job createJob() { @@ -71,7 +88,7 @@ public Job createJob() { @Override public void close() throws IOException { - executor.shutdown(); + ThreadUtils.shutdownExecutorServiceQuietly(executor, 5, TimeUnit.SECONDS); metrics.close(); } diff --git a/storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java b/storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java index 26024f93ec..7072175161 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java @@ -56,6 +56,18 @@ import io.github.bucket4j.Bandwidth; import io.github.bucket4j.Bucket; +/** + * Reader for fetching data from Inkless storage. + * + *

Thread Pool Lifecycle

+ *

This class manages thread pools for metadata fetching, data fetching, and optionally + * for lagging consumer requests. All pools must be shut down via {@link #close()}. + * + *

Design Note: Thread pools are created in the constructor arguments before delegation. + * If construction fails after pool creation (e.g., invalid lagging consumer configuration), + * the pools may leak. This is acceptable for broker startup components where failure prevents + * broker startup and JVM exit cleans up resources. + */ public class Reader implements AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(Reader.class); private static final long EXECUTOR_SHUTDOWN_TIMEOUT_SECONDS = 5; diff --git a/storage/inkless/src/main/java/io/aiven/inkless/delete/DeleteRecordsInterceptor.java b/storage/inkless/src/main/java/io/aiven/inkless/delete/DeleteRecordsInterceptor.java index 299403c926..4119b19372 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/delete/DeleteRecordsInterceptor.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/delete/DeleteRecordsInterceptor.java @@ -21,18 +21,23 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.message.DeleteRecordsResponseData; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.utils.ThreadUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Closeable; +import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.stream.Collectors; +import io.aiven.inkless.common.InklessThreadFactory; import io.aiven.inkless.common.SharedState; import io.aiven.inkless.common.TopicIdEnricher; import io.aiven.inkless.common.TopicTypeCounter; @@ -43,22 +48,30 @@ import static org.apache.kafka.common.requests.DeleteRecordsResponse.INVALID_LOW_WATERMARK; -public class DeleteRecordsInterceptor { +public class DeleteRecordsInterceptor implements Closeable { private static final Logger LOGGER = LoggerFactory.getLogger(DeleteRecordsInterceptor.class); private final ControlPlane controlPlane; private final MetadataView metadataView; - private final Executor executor; + private final ExecutorService executorService; private final TopicTypeCounter topicTypeCounter; public DeleteRecordsInterceptor(final SharedState state) { - this(state.controlPlane(), state.metadata(), Executors.newCachedThreadPool()); + this( + state.controlPlane(), + state.metadata(), + Executors.newCachedThreadPool(new InklessThreadFactory("inkless-delete-records", false)) + ); } // Visible for testing. - DeleteRecordsInterceptor(final ControlPlane controlPlane, final MetadataView metadataView, final Executor executor) { + DeleteRecordsInterceptor( + final ControlPlane controlPlane, + final MetadataView metadataView, + final ExecutorService executorService + ) { this.controlPlane = controlPlane; - this.executor = executor; + this.executorService = executorService; this.metadataView = metadataView; this.topicTypeCounter = new TopicTypeCounter(metadataView); } @@ -94,7 +107,7 @@ public boolean intercept(final Map offsetPerPartition, } // TODO use purgatory - executor.execute(() -> { + executorService.execute(() -> { try { final List requests = offsetPerPartitionEnriched.entrySet().stream() .map(kv -> new DeleteRecordsRequest(kv.getKey(), kv.getValue())) @@ -132,4 +145,9 @@ private void respondAllWithError(final Map offsetPerPartit )); responseCallback.accept(response); } + + @Override + public void close() throws IOException { + ThreadUtils.shutdownExecutorServiceQuietly(executorService, 5, TimeUnit.SECONDS); + } } diff --git a/storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitter.java b/storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitter.java index 0f54613448..0b8d365997 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitter.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitter.java @@ -54,7 +54,22 @@ /** * The file committer. * - *

It uploads files concurrently, but commits them to the control plan sequentially. + *

It uploads files concurrently, but commits them to the control plane sequentially. + * + *

Thread Pool Lifecycle

+ *

This class manages three thread pools for upload, commit, and cache storage operations. + * The pools are created during construction and must be shut down via {@link #close()}. + * + *

Design Note: Thread pools are created in the constructor arguments before delegation. + * If construction fails after pool creation (e.g., due to invalid arguments), the pools may leak. + * This is acceptable because: + *

+ *

Arguments are validated early in the delegated constructor to fail-fast before + * any significant work is done with the thread pools. */ class FileCommitter implements Closeable { private static final Logger LOGGER = LoggerFactory.getLogger(FileCommitter.class); @@ -247,6 +262,7 @@ public void close() throws IOException { // Don't wait here, they should try to finish their work. executorServiceUpload.shutdown(); executorServiceCommit.shutdown(); + executorServiceCacheStore.shutdown(); metrics.close(); if (threadPoolMonitor != null) threadPoolMonitor.close(); } diff --git a/storage/inkless/src/main/java/io/aiven/inkless/produce/Writer.java b/storage/inkless/src/main/java/io/aiven/inkless/produce/Writer.java index a85c8d9847..3c1446f996 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/produce/Writer.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/produce/Writer.java @@ -63,7 +63,16 @@ *

The Writer has the active file, the queue of files being uploaded. * It schedules commit ticks at the specified interval. * - *

The class is thread-safe: all the event entry points are protected with the lock.

+ *

The class is thread-safe: all the event entry points are protected with the lock. + * + *

Thread Pool Lifecycle

+ *

This class creates a scheduled executor for commit ticks and delegates to {@link FileCommitter} + * which manages additional thread pools. All pools must be shut down via {@link #close()}. + * + *

Design Note: Thread pools are created in the constructor arguments before delegation. + * If construction fails after pool creation, the pools may leak. This is acceptable for broker + * startup components where failure prevents broker startup and JVM exit cleans up resources. + * See {@link FileCommitter} for details. */ class Writer implements Closeable { private static final Logger LOGGER = LoggerFactory.getLogger(Writer.class); diff --git a/storage/inkless/src/test/java/io/aiven/inkless/consume/FetchOffsetHandlerTest.java b/storage/inkless/src/test/java/io/aiven/inkless/consume/FetchOffsetHandlerTest.java index 11dcaf12db..d9a702f0e7 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/consume/FetchOffsetHandlerTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/consume/FetchOffsetHandlerTest.java @@ -37,11 +37,13 @@ import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; +import java.io.IOException; import java.util.List; import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import io.aiven.inkless.control_plane.ControlPlane; import io.aiven.inkless.control_plane.ListOffsetsRequest; @@ -201,4 +203,17 @@ void cancellation(final boolean cancelBeforeStart) { verify(submittedFuture).cancel(eq(true)); } + + @Test + void closeShutdownsExecutorService() throws IOException, InterruptedException { + when(executor.awaitTermination(5, TimeUnit.SECONDS)).thenReturn(true); + + final InklessFetchOffsetMetrics metricsToClose = new InklessFetchOffsetMetrics(time); + final FetchOffsetHandler handler = new FetchOffsetHandler(null, executor, time, metricsToClose); + + handler.close(); + + verify(executor).shutdown(); + verify(executor).awaitTermination(5, TimeUnit.SECONDS); + } } diff --git a/storage/inkless/src/test/java/io/aiven/inkless/delete/DeleteRecordsInterceptorTest.java b/storage/inkless/src/test/java/io/aiven/inkless/delete/DeleteRecordsInterceptorTest.java index 7ee6ec50aa..af6b2ead66 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/delete/DeleteRecordsInterceptorTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/delete/DeleteRecordsInterceptorTest.java @@ -32,8 +32,10 @@ import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; +import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutorService; import java.util.function.Consumer; import io.aiven.inkless.control_plane.ControlPlane; @@ -61,6 +63,8 @@ class DeleteRecordsInterceptorTest { ControlPlane controlPlane; @Mock Consumer> responseCallback; + @Mock + ExecutorService executorService; @Captor ArgumentCaptor> resultCaptor; @@ -68,7 +72,7 @@ class DeleteRecordsInterceptorTest { ArgumentCaptor> deleteRecordsCaptor; @Test - public void mixingDisklessAndClassicTopicsIsNotAllowed() { + public void mixingDisklessAndClassicTopicsIsNotAllowed() throws Exception { when(metadataView.isDisklessTopic(eq("diskless"))).thenReturn(true); when(metadataView.isDisklessTopic(eq("non_diskless"))).thenReturn(false); @@ -98,10 +102,11 @@ public void mixingDisklessAndClassicTopicsIsNotAllowed() { .setLowWatermark(INVALID_LOW_WATERMARK) )); verify(controlPlane, never()).deleteRecords(any()); + interceptor.close(); } @Test - public void notInterceptDeletingRecordsFromClassicTopics() { + public void notInterceptDeletingRecordsFromClassicTopics() throws Exception { when(metadataView.isDisklessTopic(eq("non_diskless"))).thenReturn(false); final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor(controlPlane, metadataView, new SynchronousExecutor()); @@ -114,10 +119,11 @@ public void notInterceptDeletingRecordsFromClassicTopics() { assertThat(result).isFalse(); verify(responseCallback, never()).accept(any()); verify(controlPlane, never()).deleteRecords(any()); + interceptor.close(); } @Test - public void interceptDeletingRecordsFromDisklessTopics() { + public void interceptDeletingRecordsFromDisklessTopics() throws Exception { final Uuid topicId = new Uuid(1, 2); when(metadataView.isDisklessTopic(eq("diskless"))).thenReturn(true); when(metadataView.getTopicId(eq("diskless"))).thenReturn(topicId); @@ -170,10 +176,11 @@ public void interceptDeletingRecordsFromDisklessTopics() { .setErrorCode(Errors.KAFKA_STORAGE_ERROR.code()) .setLowWatermark(INVALID_LOW_WATERMARK) )); + interceptor.close(); } @Test - public void controlPlaneException() { + public void controlPlaneException() throws Exception { final Uuid topicId = new Uuid(1, 2); when(metadataView.isDisklessTopic(eq("diskless"))).thenReturn(true); when(metadataView.getTopicId(eq("diskless"))).thenReturn(topicId); @@ -201,10 +208,11 @@ public void controlPlaneException() { .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code()) .setLowWatermark(INVALID_LOW_WATERMARK) )); + interceptor.close(); } @Test - public void topicIdNotFound() { + public void topicIdNotFound() throws Exception { when(metadataView.isDisklessTopic(eq("diskless1"))).thenReturn(true); when(metadataView.isDisklessTopic(eq("diskless2"))).thenReturn(true); // This instead of the normal thenReturn to not depend on the map key iteration order @@ -244,5 +252,18 @@ public void topicIdNotFound() { .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code()) .setLowWatermark(INVALID_LOW_WATERMARK) )); + interceptor.close(); + } + + @Test + public void closeShutdownsExecutorService() throws IOException, InterruptedException { + when(executorService.awaitTermination(5, java.util.concurrent.TimeUnit.SECONDS)).thenReturn(true); + + final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor(controlPlane, metadataView, executorService); + + interceptor.close(); + + verify(executorService).shutdown(); + verify(executorService).awaitTermination(5, java.util.concurrent.TimeUnit.SECONDS); } } diff --git a/storage/inkless/src/test/java/io/aiven/inkless/produce/FileCommitterTest.java b/storage/inkless/src/test/java/io/aiven/inkless/produce/FileCommitterTest.java index e89b18c3c0..56b43cc787 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/produce/FileCommitterTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/produce/FileCommitterTest.java @@ -277,6 +277,7 @@ void close() throws IOException { verify(executorServiceUpload).shutdown(); verify(executorServiceCommit).shutdown(); + verify(executorServiceCacheStore).shutdown(); verify(metrics).close(); }