From c5c3f2ffeba16c1a3bbbec5f0f97688c2e3947c9 Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Fri, 26 Dec 2025 16:47:58 +0200 Subject: [PATCH] refactor(inkless): SharedState to own StorageBackends Properly separating storage back-end clients between read/write and background jobs. SharedState now owns the StorageBackends lifecycle, properly closing all of them. So, Reader and Writer component do not have to close storages anymore. Alongside, making background components internal constructors visible for easier testing. This allows to make internal SharedState constructor private. --- .../kafka/server/ReplicaManagerTest.scala | 33 ++-- .../io/aiven/inkless/common/SharedState.java | 153 ++++++++++++++---- .../aiven/inkless/consume/FetchHandler.java | 19 +-- .../java/io/aiven/inkless/consume/Reader.java | 57 ++++--- .../delete/DeleteRecordsInterceptor.java | 18 ++- .../io/aiven/inkless/delete/FileCleaner.java | 4 +- .../io/aiven/inkless/merge/FileMerger.java | 35 +++- .../aiven/inkless/produce/AppendHandler.java | 2 +- .../java/io/aiven/inkless/produce/Writer.java | 21 +-- .../in_memory/InMemoryStorage.java | 5 +- .../aiven/inkless/common/SharedStateTest.java | 122 ++++++++++++++ .../io/aiven/inkless/consume/ReaderTest.java | 37 +++-- .../delete/DeleteRecordsInterceptorTest.java | 68 ++------ .../delete/FileCleanerIntegrationTest.java | 8 - .../inkless/delete/FileCleanerMockedTest.java | 9 ++ .../merge/FileMergerIntegrationTest.java | 10 +- .../inkless/merge/FileMergerMockedTest.java | 49 ++---- .../inkless/produce/AppendHandlerTest.java | 56 +++---- .../inkless/produce/WriterMockedTest.java | 47 +++--- .../inkless/produce/WriterPropertyTest.java | 1 - .../in_memory/InMemoryStorageTest.java | 19 +-- 21 files changed, 469 insertions(+), 304 deletions(-) create mode 100644 storage/inkless/src/test/java/io/aiven/inkless/common/SharedStateTest.java diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 3e6dcc78cc..a736834c99 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -91,7 +91,7 @@ import org.mockito.ArgumentMatchers._ import org.mockito.Mockito._ import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer -import org.mockito.{Answers, ArgumentCaptor, ArgumentMatchers, MockedConstruction} +import org.mockito.{ArgumentCaptor, ArgumentMatchers, MockedConstruction} import java.io.{ByteArrayInputStream, File} import java.net.InetAddress @@ -6495,7 +6495,7 @@ class ReplicaManagerTest { } @Test - def testAppendWithInvalidDisklessAndValidCLassic(): Unit = { + def testAppendWithInvalidDisklessAndValidClassic(): Unit = { val entriesPerPartition = Map( disklessTopicPartition -> RECORDS, classicTopicPartition -> RECORDS, @@ -6643,7 +6643,7 @@ class ReplicaManagerTest { val replicaManager = try { createReplicaManager( List(disklessTopicPartition.topic(), disklessTopicPartition2.topic()), - controlPlane = Some(cp), + controlPlaneOption = Some(cp), topicIdMapping = Map(disklessTopicPartition2.topic() -> disklessTopicPartition2.topicId()) ) } finally { @@ -6706,7 +6706,7 @@ class ReplicaManagerTest { val fetchHandlerCtor: MockedConstruction[FetchHandler] = mockFetchHandler(disklessResponse) val replicaManager = try { - createReplicaManager(List(disklessTopicPartition.topic()), controlPlane = Some(cp)) + createReplicaManager(List(disklessTopicPartition.topic()), controlPlaneOption = Some(cp)) } finally { fetchHandlerCtor.close() } @@ -6765,7 +6765,7 @@ class ReplicaManagerTest { val replicaManager = try { // spy to inject readFromLog mock - spy(createReplicaManager(List(disklessTopicPartition.topic()), controlPlane = Some(cp))) + spy(createReplicaManager(List(disklessTopicPartition.topic()), controlPlaneOption = Some(cp))) } finally { fetchHandlerCtor.close() } @@ -6845,7 +6845,7 @@ class ReplicaManagerTest { val replicaManager = try { // spy to inject readFromLog mock - spy(createReplicaManager(List(disklessTopicPartition.topic()), controlPlane = Some(cp))) + spy(createReplicaManager(List(disklessTopicPartition.topic()), controlPlaneOption = Some(cp))) } finally { fetchHandlerCtor.close() } @@ -6919,7 +6919,7 @@ class ReplicaManagerTest { val fetchHandlerCtor: MockedConstruction[FetchHandler] = mockFetchHandler(disklessResponse) val replicaManager = try { - createReplicaManager(List(disklessTopicPartition.topic()), controlPlane = Some(cp)) + createReplicaManager(List(disklessTopicPartition.topic()), controlPlaneOption = Some(cp)) } finally { fetchHandlerCtor.close() } @@ -6982,7 +6982,7 @@ class ReplicaManagerTest { val fetchHandlerCtor: MockedConstruction[FetchHandler] = mockFetchHandler(disklessResponse) val replicaManager = try { - spy(createReplicaManager(List(disklessTopicPartition.topic()), controlPlane = Some(cp))) + spy(createReplicaManager(List(disklessTopicPartition.topic()), controlPlaneOption = Some(cp))) } finally { fetchHandlerCtor.close() } @@ -7057,7 +7057,7 @@ class ReplicaManagerTest { val fetchHandlerCtor: MockedConstruction[FetchHandler] = mockFetchHandler(disklessResponse) val replicaManager = try { - spy(createReplicaManager(List(disklessTopicPartition.topic()), controlPlane = Some(cp))) + spy(createReplicaManager(List(disklessTopicPartition.topic()), controlPlaneOption = Some(cp))) } finally { fetchHandlerCtor.close() } @@ -7130,16 +7130,17 @@ class ReplicaManagerTest { private def createReplicaManager( disklessTopics: Seq[String], - controlPlane: Option[ControlPlane] = None, + controlPlaneOption: Option[ControlPlane] = None, topicIdMapping: Map[String, Uuid] = Map.empty ): ReplicaManager = { - val props = TestUtils.createBrokerConfig(1, logDirCount = 2) + val brokerId = 1 + val props = TestUtils.createBrokerConfig(brokerId, logDirCount = 2) val config = KafkaConfig.fromProps(props) val mockLogMgr = TestUtils.createLogManager(config.logDirs.asScala.map(new File(_)), new LogConfig(new Properties())) - val sharedState = mock(classOf[SharedState], Answers.RETURNS_DEEP_STUBS) - when(sharedState.time()).thenReturn(Time.SYSTEM) - when(sharedState.config()).thenReturn(new InklessConfig(new util.HashMap[String, Object]())) - when(sharedState.controlPlane()).thenReturn(controlPlane.getOrElse(mock(classOf[ControlPlane]))) + val inklessConfigProps = new util.HashMap[String, Object]() + inklessConfigProps.put(InklessConfig.CONSUME_BATCH_COORDINATE_CACHE_ENABLED_CONFIG, java.lang.Boolean.FALSE) + val inklessConfig = new InklessConfig(inklessConfigProps) + val controlPlane = controlPlaneOption.getOrElse(mock(classOf[ControlPlane])) val inklessMetadata = mock(classOf[MetadataView]) when(inklessMetadata.isDisklessTopic(any())).thenReturn(false) when(inklessMetadata.getTopicId(anyString())).thenAnswer{ invocation => @@ -7147,7 +7148,7 @@ class ReplicaManagerTest { topicIdMapping.getOrElse(topicName, Uuid.ZERO_UUID) } disklessTopics.foreach(t => when(inklessMetadata.isDisklessTopic(t)).thenReturn(true)) - when(sharedState.metadata()).thenReturn(inklessMetadata) + val sharedState = SharedState.initialize(time, brokerId, inklessConfig, inklessMetadata, controlPlane, new BrokerTopicStats(), () => new LogConfig(new Properties())) val logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size) diff --git a/storage/inkless/src/main/java/io/aiven/inkless/common/SharedState.java b/storage/inkless/src/main/java/io/aiven/inkless/common/SharedState.java index aa817b2bd4..76bf46c83a 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/common/SharedState.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/common/SharedState.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.MetricsReporter; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.storage.internals.log.LogConfig; import org.apache.kafka.storage.log.metrics.BrokerTopicStats; @@ -30,6 +31,7 @@ import java.io.IOException; import java.time.Duration; import java.util.List; +import java.util.Optional; import java.util.function.Supplier; import io.aiven.inkless.cache.BatchCoordinateCache; @@ -52,6 +54,18 @@ public final class SharedState implements Closeable { private final InklessConfig config; private final MetadataView metadata; private final ControlPlane controlPlane; + private final StorageBackend fetchStorage; + // Separate storage client for lagging consumers to: + // 1. Isolate connection pool usage (lagging consumers shouldn't exhaust connections for hot path) + // 2. Allow independent tuning of timeouts/retries for cold storage access patterns + // (This requires some refactoring on how the storage client is built/configured) + private final Optional maybeLaggingFetchStorage; + private final StorageBackend produceStorage; + // backgroundStorage is shared by FileCleaner and FileMerger executors which run concurrently. + // Kafka storage backends are required to be thread-safe (they share the same Metrics instance). + // However, these tasks perform high-latency object storage calls and retries. A dedicated backend + // instance guarantees they don't contend with hot-path fetch/produce clients and prevents threading/double-close issues. + private final StorageBackend backgroundStorage; private final ObjectKeyCreator objectKeyCreator; private final KeyAlignmentStrategy keyAlignmentStrategy; private final ObjectCache cache; @@ -60,12 +74,17 @@ public final class SharedState implements Closeable { private final Supplier defaultTopicConfigs; private final Metrics storageMetrics; - public SharedState( + private SharedState( final Time time, final int brokerId, final InklessConfig config, final MetadataView metadata, final ControlPlane controlPlane, + final StorageBackend fetchStorage, + final Optional maybeLaggingFetchStorage, + final StorageBackend produceStorage, + final StorageBackend backgroundStorage, + final Metrics storageMetrics, final ObjectKeyCreator objectKeyCreator, final KeyAlignmentStrategy keyAlignmentStrategy, final ObjectCache cache, @@ -84,12 +103,11 @@ public SharedState( this.batchCoordinateCache = batchCoordinateCache; this.brokerTopicStats = brokerTopicStats; this.defaultTopicConfigs = defaultTopicConfigs; - - final MetricsReporter reporter = new JmxReporter(); - this.storageMetrics = new Metrics( - new MetricConfig(), List.of(reporter), Time.SYSTEM, - new KafkaMetricsContext(STORAGE_METRIC_CONTEXT) - ); + this.fetchStorage = fetchStorage; + this.maybeLaggingFetchStorage = maybeLaggingFetchStorage; + this.produceStorage = produceStorage; + this.backgroundStorage = backgroundStorage; + this.storageMetrics = storageMetrics; } public static SharedState initialize( @@ -107,34 +125,91 @@ public static SharedState initialize( "Value of consume.batch.coordinate.cache.ttl.ms exceeds file.cleaner.retention.period.ms / 2" ); } - return new SharedState( - time, - brokerId, - config, - metadata, - controlPlane, - ObjectKey.creator(config.objectKeyPrefix(), config.objectKeyLogPrefixMasked()), - new FixedBlockAlignment(config.fetchCacheBlockBytes()), - new CaffeineCache( + + CaffeineCache objectCache = null; + BatchCoordinateCache batchCoordinateCache = null; + StorageBackend fetchStorage = null; + StorageBackend laggingFetchStorage = null; + StorageBackend produceStorage = null; + StorageBackend backgroundStorage = null; + Metrics storageMetrics = null; + try { + objectCache = new CaffeineCache( config.cacheMaxCount(), config.cacheExpirationLifespanSec(), config.cacheExpirationMaxIdleSec() - ), - config.isBatchCoordinateCacheEnabled() ? new CaffeineBatchCoordinateCache(config.batchCoordinateCacheTtl()) : new NullBatchCoordinateCache(), - brokerTopicStats, - defaultTopicConfigs - ); + ); + batchCoordinateCache = config.isBatchCoordinateCacheEnabled() + ? new CaffeineBatchCoordinateCache(config.batchCoordinateCacheTtl()) + : new NullBatchCoordinateCache(); + + final MetricsReporter reporter = new JmxReporter(); + storageMetrics = new Metrics( + new MetricConfig(), List.of(reporter), Time.SYSTEM, + new KafkaMetricsContext(STORAGE_METRIC_CONTEXT) + ); + fetchStorage = config.storage(storageMetrics); + // If thread pool size is 0, disabling lagging consumer support, don't create a separate client + // + // NOTE: The client for lagging consumers is created only when this SharedState + // is constructed. If fetchLaggingConsumerThreadPoolSize() is 0 at this time, no separate + // client is created and lagging consumer support is effectively disabled for the lifetime + // of this instance, even if the configuration is later reloaded with a non-zero value. + // Enabling lagging consumer support therefore requires a broker restart (or reconstruction + // of the SharedState) so that a new storage client can be created. + laggingFetchStorage = config.fetchLaggingConsumerThreadPoolSize() > 0 ? config.storage(storageMetrics) : null; + produceStorage = config.storage(storageMetrics); + backgroundStorage = config.storage(storageMetrics); + final var objectKeyCreator = ObjectKey.creator(config.objectKeyPrefix(), config.objectKeyLogPrefixMasked()); + final var keyAlignmentStrategy = new FixedBlockAlignment(config.fetchCacheBlockBytes()); + return new SharedState( + time, + brokerId, + config, + metadata, + controlPlane, + fetchStorage, + Optional.ofNullable(laggingFetchStorage), + produceStorage, + backgroundStorage, + storageMetrics, + objectKeyCreator, + keyAlignmentStrategy, + objectCache, + batchCoordinateCache, + brokerTopicStats, + defaultTopicConfigs + ); + } catch (Exception e) { + // Closing storage backends + Utils.closeQuietly(backgroundStorage, "backgroundStorage"); + Utils.closeQuietly(produceStorage, "produceStorage"); + Utils.closeQuietly(fetchStorage, "fetchStorage"); + Utils.closeQuietly(laggingFetchStorage, "laggingFetchStorage"); + // Closing storage metrics + Utils.closeQuietly(storageMetrics, "storageMetrics"); + // Closing caches + Utils.closeQuietly(batchCoordinateCache, "batchCoordinateCache"); + Utils.closeQuietly(objectCache, "objectCache"); + + throw new RuntimeException("Failed to initialize SharedState", e); + } } @Override public void close() throws IOException { - try { - cache.close(); - controlPlane.close(); - storageMetrics.close(); - } catch (Exception e) { - throw new RuntimeException(e); - } + // Closing storage backends + Utils.closeQuietly(backgroundStorage, "backgroundStorage"); + Utils.closeQuietly(produceStorage, "produceStorage"); + maybeLaggingFetchStorage.ifPresent(s -> Utils.closeQuietly(s, "laggingFetchStorage")); + Utils.closeQuietly(fetchStorage, "fetchStorage"); + // Closing storage metrics + Utils.closeQuietly(storageMetrics, "storageMetrics"); + // Closing caches + Utils.closeQuietly(cache, "objectCache"); + Utils.closeQuietly(batchCoordinateCache, "batchCoordinateCache"); + // Closing control plane + Utils.closeQuietly(controlPlane, "controlPlane"); } public Time time() { @@ -185,7 +260,25 @@ public Supplier defaultTopicConfigs() { return defaultTopicConfigs; } - public StorageBackend buildStorage() { - return config.storage(storageMetrics); + public StorageBackend fetchStorage() { + return fetchStorage; + } + + /** + * Optional access to the lagging fetch storage backend. + * + *

When {@code fetch.lagging.consumer.thread.pool.size == 0}, the lagging consumer + * path is explicitly disabled and this storage backend is not created.

+ */ + public Optional maybeLaggingFetchStorage() { + return maybeLaggingFetchStorage; + } + + public StorageBackend produceStorage() { + return produceStorage; + } + + public StorageBackend backgroundStorage() { + return backgroundStorage; } } diff --git a/storage/inkless/src/main/java/io/aiven/inkless/consume/FetchHandler.java b/storage/inkless/src/main/java/io/aiven/inkless/consume/FetchHandler.java index ee1c80c5ce..75caca6476 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/consume/FetchHandler.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/consume/FetchHandler.java @@ -51,23 +51,11 @@ public FetchHandler(final SharedState state) { state.keyAlignmentStrategy(), state.cache(), state.controlPlane(), - state.buildStorage(), + state.fetchStorage(), state.brokerTopicStats(), state.config().fetchMetadataThreadPoolSize(), state.config().fetchDataThreadPoolSize(), - // Separate storage client for lagging consumers to: - // 1. Isolate connection pool usage (lagging consumers shouldn't exhaust connections for hot path) - // 2. Allow independent tuning of timeouts/retries for cold storage access patterns - // (This requires some refactoring on how the storage client is built/configured) - // If thread pool size is 0, disabling lagging consumer support, don't create a separate client - // - // NOTE: The client for lagging consumers is created only when this FetchHandler (and Reader) - // is constructed. If fetchLaggingConsumerThreadPoolSize() is 0 at this time, no separate - // client is created and lagging consumer support is effectively disabled for the lifetime - // of this instance, even if the configuration is later reloaded with a non-zero value. - // Enabling lagging consumer support therefore requires a broker restart (or reconstruction - // of the SharedState/FetchHandler) so that a new storage client can be created. - state.config().fetchLaggingConsumerThreadPoolSize() > 0 ? state.buildStorage() : null, + state.maybeLaggingFetchStorage(), state.config().fetchLaggingConsumerThresholdMs(), state.config().fetchLaggingConsumerRequestRateLimit(), state.config().fetchLaggingConsumerThreadPoolSize(), @@ -76,7 +64,8 @@ public FetchHandler(final SharedState state) { ); } - public FetchHandler(final Reader reader) { + // Visible for testing + FetchHandler(final Reader reader) { this.reader = reader; } diff --git a/storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java b/storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java index 80228c6811..26024f93ec 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java @@ -33,6 +33,7 @@ import java.time.Instant; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -51,6 +52,7 @@ import io.aiven.inkless.control_plane.ControlPlane; import io.aiven.inkless.generated.FileExtent; import io.aiven.inkless.storage_backend.common.ObjectFetcher; +import io.aiven.inkless.storage_backend.common.StorageBackend; import io.github.bucket4j.Bandwidth; import io.github.bucket4j.Bucket; @@ -107,7 +109,7 @@ public Reader( BrokerTopicStats brokerTopicStats, int fetchMetadataThreadPoolSize, int fetchDataThreadPoolSize, - ObjectFetcher laggingObjectFetcher, + Optional maybeLaggingObjectFetcher, long laggingConsumerThresholdMs, int laggingConsumerRequestRateLimit, int laggingConsumerThreadPoolSize, @@ -125,33 +127,34 @@ public Reader( Executors.newFixedThreadPool(fetchDataThreadPoolSize, new InklessThreadFactory("inkless-fetch-data-", false)), // Only create lagging consumer fetcher when feature is enabled (pool size > 0). // A pool size of 0 is a valid configuration that explicitly disables the feature (null fetcher and executor). - laggingConsumerThreadPoolSize > 0 ? laggingObjectFetcher : null, + maybeLaggingObjectFetcher, laggingConsumerThresholdMs, laggingConsumerRequestRateLimit, // Only create lagging consumer resources when feature is enabled (pool size > 0). // A pool size of 0 is a valid configuration that explicitly disables the feature // by passing both a null executor and a null laggingObjectFetcher. - laggingConsumerThreadPoolSize > 0 - ? createBoundedThreadPool(laggingConsumerThreadPoolSize) - : null, + maybeCreateBoundedThreadPool(laggingConsumerThreadPoolSize), new InklessFetchMetrics(time, cache), brokerTopicStats ); } - private static ExecutorService createBoundedThreadPool(int poolSize) { + private static Optional maybeCreateBoundedThreadPool(int poolSize) { // Creates a bounded thread pool for lagging consumer fetch requests. // Fixed pool design: all threads persist for executor lifetime (never removed when idle). + if (poolSize == 0) return Optional.empty(); final int queueCapacity = poolSize * LAGGING_CONSUMER_QUEUE_MULTIPLIER; - return new ThreadPoolExecutor( - poolSize, // corePoolSize: fixed pool, always this many threads - poolSize, // maximumPoolSize: no dynamic scaling (core == max) - 0L, // keepAliveTime: unused for fixed pools (core threads don't time out) - TimeUnit.MILLISECONDS, - new ArrayBlockingQueue<>(queueCapacity), // Bounded queue prevents OOM - new InklessThreadFactory("inkless-fetch-lagging-consumer-", false), - // Why AbortPolicy: CallerRunsPolicy would block request handler threads causing broker-wide degradation - new ThreadPoolExecutor.AbortPolicy() // Reject when full, don't block callers + return Optional.of( + new ThreadPoolExecutor( + poolSize, // corePoolSize: fixed pool, always this many threads + poolSize, // maximumPoolSize: no dynamic scaling (core == max) + 0L, // keepAliveTime: unused for fixed pools (core threads don't time out) + TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(queueCapacity), // Bounded queue prevents OOM + new InklessThreadFactory("inkless-fetch-lagging-consumer-", false), + // Why AbortPolicy: CallerRunsPolicy would block request handler threads causing broker-wide degradation + new ThreadPoolExecutor.AbortPolicy() // Reject when full, don't block callers + ) ); } @@ -166,10 +169,10 @@ private static ExecutorService createBoundedThreadPool(int poolSize) { int maxBatchesPerPartitionToFind, ExecutorService metadataExecutor, ExecutorService fetchDataExecutor, - ObjectFetcher laggingConsumerObjectFetcher, + Optional maybeLaggingConsumerObjectFetcher, long laggingConsumerThresholdMs, int laggingConsumerRequestRateLimit, - ExecutorService laggingFetchDataExecutor, + Optional maybeLaggingFetchDataExecutor, InklessFetchMetrics fetchMetrics, BrokerTopicStats brokerTopicStats ) { @@ -182,25 +185,31 @@ private static ExecutorService createBoundedThreadPool(int poolSize) { this.maxBatchesPerPartitionToFind = maxBatchesPerPartitionToFind; this.metadataExecutor = metadataExecutor; this.fetchDataExecutor = fetchDataExecutor; - this.laggingFetchDataExecutor = laggingFetchDataExecutor; this.laggingConsumerThresholdMs = laggingConsumerThresholdMs; - this.laggingConsumerObjectFetcher = laggingConsumerObjectFetcher; // Validate that lagging consumer resources are consistently configured: // both executor and fetcher must be null (feature disabled) or both must be non-null (feature enabled). // This ensures fail-fast behavior rather than silent runtime failure. - if ((laggingFetchDataExecutor == null) != (laggingConsumerObjectFetcher == null)) { + if (maybeLaggingFetchDataExecutor.isPresent() != maybeLaggingConsumerObjectFetcher.isPresent()) { throw new IllegalArgumentException( "Lagging consumer feature requires both laggingFetchDataExecutor and laggingConsumerObjectFetcher " + "to be non-null (feature enabled) or both to be null (feature disabled). " - + "Found: executor=" + (laggingFetchDataExecutor != null ? "non-null" : "null") - + ", fetcher=" + (laggingConsumerObjectFetcher != null ? "non-null" : "null") + + "Found: executor=" + (maybeLaggingFetchDataExecutor.isPresent() ? "non-null" : "null") + + ", fetcher=" + (maybeLaggingConsumerObjectFetcher.isPresent() ? "non-null" : "null") ); } + // Unwrap Optional to nullable fields. This is safe because: + // 1. Validation above ensures both are consistently present or absent + // 2. Internal code (FetchPlanner) uses efficient null checks: laggingExecutor != null && laggingFetcher != null + // 3. Null checks are already present throughout the codebase for feature detection + // 4. Using Optional in internal fields/parameters would add overhead in the hot fetch path + this.laggingFetchDataExecutor = maybeLaggingFetchDataExecutor.orElse(null); + this.laggingConsumerObjectFetcher = maybeLaggingConsumerObjectFetcher.orElse(null); + // Initialize rate limiter only if lagging consumer feature is enabled (executor exists) and rate limit > 0 // This avoids creating unused objects when the feature is disabled - if (laggingFetchDataExecutor != null && laggingConsumerRequestRateLimit > 0) { + if (this.laggingFetchDataExecutor != null && laggingConsumerRequestRateLimit > 0) { // Rate limiter configuration: // - capacity = rateLimit: Allows initial burst up to full rate limit (e.g., 200 tokens for 200 req/s) // - refillGreedy: Refills at rateLimit tokens per second @@ -423,8 +432,6 @@ public void close() throws IOException { if (metadataThreadPoolMonitor != null) metadataThreadPoolMonitor.close(); if (dataThreadPoolMonitor != null) dataThreadPoolMonitor.close(); if (laggingConsumerThreadPoolMonitor != null) laggingConsumerThreadPoolMonitor.close(); - objectFetcher.close(); - if (laggingConsumerObjectFetcher != null) laggingConsumerObjectFetcher.close(); fetchMetrics.close(); } } diff --git a/storage/inkless/src/main/java/io/aiven/inkless/delete/DeleteRecordsInterceptor.java b/storage/inkless/src/main/java/io/aiven/inkless/delete/DeleteRecordsInterceptor.java index 09635b3801..299403c926 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/delete/DeleteRecordsInterceptor.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/delete/DeleteRecordsInterceptor.java @@ -36,27 +36,31 @@ import io.aiven.inkless.common.SharedState; import io.aiven.inkless.common.TopicIdEnricher; import io.aiven.inkless.common.TopicTypeCounter; +import io.aiven.inkless.control_plane.ControlPlane; import io.aiven.inkless.control_plane.DeleteRecordsRequest; import io.aiven.inkless.control_plane.DeleteRecordsResponse; +import io.aiven.inkless.control_plane.MetadataView; import static org.apache.kafka.common.requests.DeleteRecordsResponse.INVALID_LOW_WATERMARK; public class DeleteRecordsInterceptor { private static final Logger LOGGER = LoggerFactory.getLogger(DeleteRecordsInterceptor.class); - private final SharedState state; + private final ControlPlane controlPlane; + private final MetadataView metadataView; private final Executor executor; private final TopicTypeCounter topicTypeCounter; public DeleteRecordsInterceptor(final SharedState state) { - this(state, Executors.newCachedThreadPool()); + this(state.controlPlane(), state.metadata(), Executors.newCachedThreadPool()); } // Visible for testing. - DeleteRecordsInterceptor(final SharedState state, final Executor executor) { - this.state = state; + DeleteRecordsInterceptor(final ControlPlane controlPlane, final MetadataView metadataView, final Executor executor) { + this.controlPlane = controlPlane; this.executor = executor; - this.topicTypeCounter = new TopicTypeCounter(this.state.metadata()); + this.metadataView = metadataView; + this.topicTypeCounter = new TopicTypeCounter(metadataView); } /** @@ -82,7 +86,7 @@ public boolean intercept(final Map offsetPerPartition, final Map offsetPerPartitionEnriched; try { - offsetPerPartitionEnriched = TopicIdEnricher.enrich(state.metadata(), offsetPerPartition); + offsetPerPartitionEnriched = TopicIdEnricher.enrich(metadataView, offsetPerPartition); } catch (final TopicIdEnricher.TopicIdNotFoundException e) { LOGGER.error("Cannot find UUID for topic {}", e.topicName); respondAllWithError(offsetPerPartition, responseCallback, Errors.UNKNOWN_SERVER_ERROR); @@ -95,7 +99,7 @@ public boolean intercept(final Map offsetPerPartition, final List requests = offsetPerPartitionEnriched.entrySet().stream() .map(kv -> new DeleteRecordsRequest(kv.getKey(), kv.getValue())) .toList(); - final List responses = state.controlPlane().deleteRecords(requests); + final List responses = controlPlane.deleteRecords(requests); final Map result = new HashMap<>(); for (int i = 0; i < responses.size(); i++) { final DeleteRecordsRequest request = requests.get(i); diff --git a/storage/inkless/src/main/java/io/aiven/inkless/delete/FileCleaner.java b/storage/inkless/src/main/java/io/aiven/inkless/delete/FileCleaner.java index 87022ab36f..16af9348ab 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/delete/FileCleaner.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/delete/FileCleaner.java @@ -63,7 +63,7 @@ public FileCleaner(SharedState sharedState) { this( sharedState.time(), sharedState.controlPlane(), - sharedState.buildStorage(), + sharedState.backgroundStorage(), sharedState.objectKeyCreator(), sharedState.config().fileCleanerRetentionPeriod() ); @@ -143,7 +143,7 @@ private void cleanFiles(Set objectKeyPaths) throws StorageBackendExcepti @Override public void close() throws IOException { - storage.close(); + // SharedState owns the storage backend lifecycle; only close component metrics here. metrics.close(); } } diff --git a/storage/inkless/src/main/java/io/aiven/inkless/merge/FileMerger.java b/storage/inkless/src/main/java/io/aiven/inkless/merge/FileMerger.java index 7bce9f39ed..b45bf77746 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/merge/FileMerger.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/merge/FileMerger.java @@ -67,12 +67,31 @@ public class FileMerger implements Runnable, Closeable { private final AtomicInteger attempts = new AtomicInteger(); public FileMerger(final SharedState sharedState) { - this.brokerId = sharedState.brokerId(); - this.time = sharedState.time(); - this.config = sharedState.config(); - this.controlPlane = sharedState.controlPlane(); - this.storage = sharedState.buildStorage(); - this.objectKeyCreator = sharedState.objectKeyCreator(); + this( + sharedState.time(), + sharedState.config(), + sharedState.controlPlane(), + sharedState.backgroundStorage(), + sharedState.objectKeyCreator(), + sharedState.brokerId(), + sharedState.config().fileMergeWorkDir() + ); + } + + // Visible for testing. + FileMerger(final Time time, + final InklessConfig config, + final ControlPlane controlPlane, + final StorageBackend storage, + final ObjectKeyCreator objectKeyCreator, + final int brokerId, + final Path workDir) { + this.time = time; + this.config = config; + this.controlPlane = controlPlane; + this.storage = storage; + this.objectKeyCreator = objectKeyCreator; + this.brokerId = brokerId; this.metrics = new FileMergerMetrics(); // This backoff is needed only for jitter, there's no exponent in it. @@ -80,7 +99,7 @@ public FileMerger(final SharedState sharedState) { final var noWorkBackoff = new ExponentialBackoff(noWorkBackoffDuration, 1, noWorkBackoffDuration * 2, 0.2); noWorkBackoffSupplier = () -> noWorkBackoff.backoff(1); - this.workDir = config.fileMergeWorkDir(); + this.workDir = workDir; } @Override @@ -239,7 +258,7 @@ private void tryDeleteFile(ObjectKey objectKey, Exception e) { @Override public void close() throws IOException { - storage.close(); + // Storage backend is managed by SharedState; avoid double-closing shared clients. metrics.close(); } } diff --git a/storage/inkless/src/main/java/io/aiven/inkless/produce/AppendHandler.java b/storage/inkless/src/main/java/io/aiven/inkless/produce/AppendHandler.java index c54670709e..53afcff96d 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/produce/AppendHandler.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/produce/AppendHandler.java @@ -56,7 +56,7 @@ public AppendHandler(final SharedState state) { state.time(), state.brokerId(), state.objectKeyCreator(), - state.buildStorage(), + state.produceStorage(), state.keyAlignmentStrategy(), state.cache(), state.batchCoordinateCache(), diff --git a/storage/inkless/src/main/java/io/aiven/inkless/produce/Writer.java b/storage/inkless/src/main/java/io/aiven/inkless/produce/Writer.java index 7096393d4d..a85c8d9847 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/produce/Writer.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/produce/Writer.java @@ -70,7 +70,6 @@ class Writer implements Closeable { private final Lock lock = new ReentrantLock(); private ActiveFile activeFile; - private final StorageBackend storage; private final FileCommitter fileCommitter; private final Time time; private final Duration commitInterval; @@ -103,12 +102,19 @@ class Writer implements Closeable { commitInterval, maxBufferSize, Executors.newScheduledThreadPool(1, new InklessThreadFactory("inkless-file-commit-ticker-", true)), - storage, new FileCommitter( - brokerId, controlPlane, objectKeyCreator, storage, - keyAlignmentStrategy, objectCache, batchCoordinateCache, time, - maxFileUploadAttempts, fileUploadRetryBackoff, - fileUploaderThreadPoolSize), + brokerId, + controlPlane, + objectKeyCreator, + storage, + keyAlignmentStrategy, + objectCache, + batchCoordinateCache, + time, + maxFileUploadAttempts, + fileUploadRetryBackoff, + fileUploaderThreadPoolSize + ), new WriterMetrics(time), brokerTopicStats ); @@ -119,7 +125,6 @@ class Writer implements Closeable { final Duration commitInterval, final int maxBufferSize, final ScheduledExecutorService commitTickScheduler, - final StorageBackend storage, final FileCommitter fileCommitter, final WriterMetrics writerMetrics, final BrokerTopicStats brokerTopicStats) { @@ -130,7 +135,6 @@ class Writer implements Closeable { } this.maxBufferSize = maxBufferSize; this.commitTickScheduler = Objects.requireNonNull(commitTickScheduler, "commitTickScheduler cannot be null"); - this.storage = Objects.requireNonNull(storage, "storage cannot be null"); this.fileCommitter = Objects.requireNonNull(fileCommitter, "fileCommitter cannot be null"); this.writerMetrics = Objects.requireNonNull(writerMetrics, "writerMetrics cannot be null"); this.brokerTopicStats = brokerTopicStats; @@ -227,7 +231,6 @@ public void close() throws IOException { // Rotate file before closing the uploader so the file gets into the queue first. rotateFile(true); fileCommitter.close(); - storage.close(); writerMetrics.close(); } finally { lock.unlock(); diff --git a/storage/inkless/src/main/java/io/aiven/inkless/storage_backend/in_memory/InMemoryStorage.java b/storage/inkless/src/main/java/io/aiven/inkless/storage_backend/in_memory/InMemoryStorage.java index d88e0d2ec5..577fce56e6 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/storage_backend/in_memory/InMemoryStorage.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/storage_backend/in_memory/InMemoryStorage.java @@ -18,7 +18,6 @@ package io.aiven.inkless.storage_backend.in_memory; import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.utils.Time; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -47,8 +46,8 @@ public final class InMemoryStorage extends StorageBackend { private final ConcurrentHashMap storage = new ConcurrentHashMap<>(); // needed for reflection based instantiation - public InMemoryStorage() { - super(new Metrics(Time.SYSTEM)); + public InMemoryStorage(final Metrics metrics) { + super(metrics); } @Override diff --git a/storage/inkless/src/test/java/io/aiven/inkless/common/SharedStateTest.java b/storage/inkless/src/test/java/io/aiven/inkless/common/SharedStateTest.java new file mode 100644 index 0000000000..9ee948a7b4 --- /dev/null +++ b/storage/inkless/src/test/java/io/aiven/inkless/common/SharedStateTest.java @@ -0,0 +1,122 @@ +/* + * Inkless + * Copyright (C) 2024 - 2025 Aiven OY + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package io.aiven.inkless.common; + +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.storage.internals.log.LogConfig; +import org.apache.kafka.storage.log.metrics.BrokerTopicStats; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InOrder; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; + +import java.time.Duration; +import java.util.concurrent.atomic.AtomicInteger; + +import io.aiven.inkless.config.InklessConfig; +import io.aiven.inkless.control_plane.ControlPlane; +import io.aiven.inkless.control_plane.MetadataView; +import io.aiven.inkless.storage_backend.common.StorageBackend; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.STRICT_STUBS) +class SharedStateTest { + + @Mock + InklessConfig config; + + @Mock + MetadataView metadataView; + + @Mock + ControlPlane controlPlane; + + @Mock + BrokerTopicStats brokerTopicStats; + + @Mock + StorageBackend firstBackend; + + @Mock + StorageBackend secondBackend; + + @Mock + StorageBackend thirdBackend; + + @Mock + StorageBackend fourthBackend; + + @BeforeEach + void setupConfig() { + when(config.fileCleanerRetentionPeriod()).thenReturn(Duration.ofMillis(2000)); + when(config.isBatchCoordinateCacheEnabled()).thenReturn(true); + when(config.batchCoordinateCacheTtl()).thenReturn(Duration.ofMillis(100)); + when(config.cacheMaxCount()).thenReturn(10L); + when(config.cacheExpirationLifespanSec()).thenReturn(30); + when(config.cacheExpirationMaxIdleSec()).thenReturn(10); + when(config.fetchLaggingConsumerThreadPoolSize()).thenReturn(1); + } + + @Test + void shouldCloseResourcesInReverseOrderOnFailure() throws Exception { + final AtomicInteger storageCallCount = new AtomicInteger(); + + when(config.storage(any(Metrics.class))).thenAnswer(invocation -> { + int count = storageCallCount.incrementAndGet(); + if (count == 3) { + throw new RuntimeException("Failure creating third storage"); + } + return switch (count) { + case 1 -> firstBackend; + case 2 -> secondBackend; + case 4 -> fourthBackend; + default -> thirdBackend; + }; + }); + + assertThatThrownBy(() -> SharedState.initialize( + Time.SYSTEM, + 1, + config, + metadataView, + controlPlane, + brokerTopicStats, + () -> mock(LogConfig.class) + )).isInstanceOf(RuntimeException.class) + .hasMessageContaining("Failed to initialize SharedState"); + + final InOrder inOrder = inOrder(firstBackend, secondBackend, thirdBackend, fourthBackend); + inOrder.verify(firstBackend).close(); + inOrder.verify(secondBackend).close(); + inOrder.verify(thirdBackend, times(0)).close(); + inOrder.verify(fourthBackend, times(0)).close(); + } +} diff --git a/storage/inkless/src/test/java/io/aiven/inkless/consume/ReaderTest.java b/storage/inkless/src/test/java/io/aiven/inkless/consume/ReaderTest.java index cfbcc6d80e..d9c993647d 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/consume/ReaderTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/consume/ReaderTest.java @@ -70,7 +70,7 @@ import io.aiven.inkless.control_plane.FindBatchRequest; import io.aiven.inkless.control_plane.FindBatchResponse; import io.aiven.inkless.generated.FileExtent; -import io.aiven.inkless.storage_backend.common.ObjectFetcher; +import io.aiven.inkless.storage_backend.common.StorageBackend; import io.aiven.inkless.storage_backend.common.StorageBackendException; import static java.util.concurrent.TimeUnit.SECONDS; @@ -109,7 +109,7 @@ public class ReaderTest { @Mock private ControlPlane controlPlane; @Mock - private ObjectFetcher objectFetcher; + private StorageBackend objectFetcher; @Mock private FetchParams fetchParams; @Mock @@ -133,7 +133,7 @@ public void testClose() throws Exception { reader.close(); verify(metadataExecutor, atLeastOnce()).shutdown(); verify(fetchDataExecutor, atLeastOnce()).shutdown(); - verify(laggingFetchDataExecutor, atLeastOnce()).shutdown(); + // laggingFetchDataExecutor is not used by getReader() which uses Optional.empty() } @Nested @@ -690,10 +690,10 @@ private Reader getReader() { 0, metadataExecutor, fetchDataExecutor, - objectFetcher, + Optional.empty(), Long.MAX_VALUE, 0, - laggingFetchDataExecutor, + Optional.empty(), fetchMetrics, new BrokerTopicStats()); } @@ -716,6 +716,9 @@ class RateLimitingTests { private ExecutorService fetchDataExecutor; private ExecutorService laggingFetchDataExecutor; + @Mock + private StorageBackend laggingObjectFetcher; + @BeforeEach public void setup() { metadataExecutor = Executors.newSingleThreadExecutor(); @@ -766,8 +769,8 @@ public void testRateLimitingWithLoad() throws Exception { .thenReturn(List.of(oldResponse)); final ReadableByteChannel channel = mock(ReadableByteChannel.class); - when(objectFetcher.fetch(any(), any())).thenReturn(channel); - when(objectFetcher.readToByteBuffer(channel)).thenReturn(records.buffer()); + when(laggingObjectFetcher.fetch(any(), any())).thenReturn(channel); + when(laggingObjectFetcher.readToByteBuffer(channel)).thenReturn(records.buffer()); try (final var reader = new Reader( time, @@ -779,10 +782,10 @@ public void testRateLimitingWithLoad() throws Exception { 0, metadataExecutor, fetchDataExecutor, - objectFetcher, + Optional.of(laggingObjectFetcher), LAGGING_THRESHOLD_MS, RATE_LIMIT_REQ_PER_SEC, - laggingFetchDataExecutor, + Optional.of(laggingFetchDataExecutor), fetchMetrics, new BrokerTopicStats())) { @@ -845,8 +848,8 @@ public void testRateLimitingDisabled() throws Exception { .thenReturn(List.of(oldResponse)); final ReadableByteChannel channel = mock(ReadableByteChannel.class); - when(objectFetcher.fetch(any(), any())).thenReturn(channel); - when(objectFetcher.readToByteBuffer(channel)).thenReturn(records.buffer()); + when(laggingObjectFetcher.fetch(any(), any())).thenReturn(channel); + when(laggingObjectFetcher.readToByteBuffer(channel)).thenReturn(records.buffer()); try (final var reader = new Reader( time, @@ -858,10 +861,10 @@ public void testRateLimitingDisabled() throws Exception { 0, metadataExecutor, fetchDataExecutor, - objectFetcher, + Optional.of(laggingObjectFetcher), LAGGING_THRESHOLD_MS, 0, // Rate limiting disabled - laggingFetchDataExecutor, + Optional.of(laggingFetchDataExecutor), fetchMetrics, new BrokerTopicStats())) { @@ -993,10 +996,10 @@ public void testMixedLaggingAndRecentPartitions() throws Exception { 0, testMetadataExecutor, testFetchDataExecutor, - objectFetcher, // Use same fetcher for lagging to simplify test + Optional.of(laggingObjectFetcher), LAGGING_THRESHOLD_MS, RATE_LIMIT_REQ_PER_SEC, - saturatedLaggingExecutor, // Saturated executor for lagging path + Optional.of(saturatedLaggingExecutor), // Saturated executor for lagging path fetchMetrics, new BrokerTopicStats())) { @@ -1088,10 +1091,10 @@ public void testRecentDataBypassesRateLimiting() throws Exception { 0, metadataExecutor, fetchDataExecutor, - objectFetcher, + Optional.of(objectFetcher), LAGGING_THRESHOLD_MS, RATE_LIMIT_REQ_PER_SEC, - laggingFetchDataExecutor, + Optional.of(laggingFetchDataExecutor), fetchMetrics, new BrokerTopicStats())) { diff --git a/storage/inkless/src/test/java/io/aiven/inkless/delete/DeleteRecordsInterceptorTest.java b/storage/inkless/src/test/java/io/aiven/inkless/delete/DeleteRecordsInterceptorTest.java index 43e197dbd2..7ee6ec50aa 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/delete/DeleteRecordsInterceptorTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/delete/DeleteRecordsInterceptorTest.java @@ -22,10 +22,6 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.common.message.DeleteRecordsResponseData; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.utils.MockTime; -import org.apache.kafka.common.utils.Time; -import org.apache.kafka.storage.internals.log.LogConfig; -import org.apache.kafka.storage.log.metrics.BrokerTopicStats; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -36,22 +32,10 @@ import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; -import java.time.Duration; import java.util.List; import java.util.Map; import java.util.function.Consumer; -import java.util.function.Supplier; - -import io.aiven.inkless.cache.BatchCoordinateCache; -import io.aiven.inkless.cache.CaffeineBatchCoordinateCache; -import io.aiven.inkless.cache.FixedBlockAlignment; -import io.aiven.inkless.cache.KeyAlignmentStrategy; -import io.aiven.inkless.cache.NullCache; -import io.aiven.inkless.cache.ObjectCache; -import io.aiven.inkless.common.ObjectKey; -import io.aiven.inkless.common.ObjectKeyCreator; -import io.aiven.inkless.common.SharedState; -import io.aiven.inkless.config.InklessConfig; + import io.aiven.inkless.control_plane.ControlPlane; import io.aiven.inkless.control_plane.DeleteRecordsRequest; import io.aiven.inkless.control_plane.DeleteRecordsResponse; @@ -71,25 +55,12 @@ @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.STRICT_STUBS) class DeleteRecordsInterceptorTest { - static final int BROKER_ID = 11; - static final ObjectKeyCreator OBJECT_KEY_CREATOR = ObjectKey.creator("", false); - private static final KeyAlignmentStrategy KEY_ALIGNMENT_STRATEGY = new FixedBlockAlignment(Integer.MAX_VALUE); - private static final ObjectCache OBJECT_CACHE = new NullCache(); - private static final BatchCoordinateCache BATCH_COORDINATE_CACHE = new CaffeineBatchCoordinateCache(Duration.ofSeconds(30)); - - static final Supplier DEFAULT_TOPIC_CONFIGS = () -> new LogConfig(Map.of()); - - Time time = new MockTime(); - @Mock - InklessConfig disklessConfig; @Mock MetadataView metadataView; @Mock ControlPlane controlPlane; @Mock Consumer> responseCallback; - @Mock - BrokerTopicStats brokerTopicStats; @Captor ArgumentCaptor> resultCaptor; @@ -100,20 +71,8 @@ class DeleteRecordsInterceptorTest { public void mixingDisklessAndClassicTopicsIsNotAllowed() { when(metadataView.isDisklessTopic(eq("diskless"))).thenReturn(true); when(metadataView.isDisklessTopic(eq("non_diskless"))).thenReturn(false); - final SharedState state = new SharedState( - time, - BROKER_ID, - disklessConfig, - metadataView, - controlPlane, - OBJECT_KEY_CREATOR, - KEY_ALIGNMENT_STRATEGY, - OBJECT_CACHE, - BATCH_COORDINATE_CACHE, - brokerTopicStats, - DEFAULT_TOPIC_CONFIGS - ); - final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor(state); + + final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor(controlPlane, metadataView, new SynchronousExecutor()); final Map entriesPerPartition = Map.of( new TopicPartition("diskless", 0), @@ -144,9 +103,8 @@ public void mixingDisklessAndClassicTopicsIsNotAllowed() { @Test public void notInterceptDeletingRecordsFromClassicTopics() { when(metadataView.isDisklessTopic(eq("non_diskless"))).thenReturn(false); - final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor( - new SharedState(time, BROKER_ID, disklessConfig, metadataView, controlPlane, - OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS)); + + final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor(controlPlane, metadataView, new SynchronousExecutor()); final Map entriesPerPartition = Map.of( new TopicPartition("non_diskless", 0), 4567L @@ -181,10 +139,8 @@ public void interceptDeletingRecordsFromDisklessTopics() { } }); - final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor( - new SharedState(time, BROKER_ID, disklessConfig, metadataView, controlPlane, - OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS), - new SynchronousExecutor()); + + final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor(controlPlane, metadataView, new SynchronousExecutor()); final TopicPartition tp0 = new TopicPartition("diskless", 0); final TopicPartition tp1 = new TopicPartition("diskless", 1); @@ -224,10 +180,7 @@ public void controlPlaneException() { when(controlPlane.deleteRecords(anyList())).thenThrow(new RuntimeException("test")); - final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor( - new SharedState(time, BROKER_ID, disklessConfig, metadataView, controlPlane, - OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS), - new SynchronousExecutor()); + final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor(controlPlane, metadataView, new SynchronousExecutor()); final TopicPartition topicPartition = new TopicPartition("diskless", 1); final Map entriesPerPartition = Map.of( @@ -265,10 +218,7 @@ public void topicIdNotFound() { } }); - final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor( - new SharedState(time, BROKER_ID, disklessConfig, metadataView, controlPlane, - OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS), - new SynchronousExecutor()); + final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor(controlPlane, metadataView, new SynchronousExecutor()); final TopicPartition topicPartition1 = new TopicPartition("diskless1", 1); final TopicPartition topicPartition2 = new TopicPartition("diskless2", 2); diff --git a/storage/inkless/src/test/java/io/aiven/inkless/delete/FileCleanerIntegrationTest.java b/storage/inkless/src/test/java/io/aiven/inkless/delete/FileCleanerIntegrationTest.java index 94938f471a..221bd56242 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/delete/FileCleanerIntegrationTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/delete/FileCleanerIntegrationTest.java @@ -41,18 +41,14 @@ import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import org.junit.jupiter.api.io.TempDir; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import java.io.IOException; -import java.nio.file.Path; import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; @@ -100,8 +96,6 @@ @Testcontainers @Tag("integration") class FileCleanerIntegrationTest { - private static final Logger LOGGER = LoggerFactory.getLogger(FileCleanerIntegrationTest.class); - @Container static final MinioContainer MINIO = S3TestContainer.minio(); @@ -134,8 +128,6 @@ class FileCleanerIntegrationTest { MetadataView metadataView; @Mock Supplier defaultTopicConfigs; - @TempDir - Path logDir; ControlPlane controlPlane; SharedState sharedState; diff --git a/storage/inkless/src/test/java/io/aiven/inkless/delete/FileCleanerMockedTest.java b/storage/inkless/src/test/java/io/aiven/inkless/delete/FileCleanerMockedTest.java index a9bbd67f52..c083d7de81 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/delete/FileCleanerMockedTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/delete/FileCleanerMockedTest.java @@ -41,6 +41,7 @@ import io.aiven.inkless.control_plane.FileToDelete; import io.aiven.inkless.storage_backend.common.StorageBackend; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -58,6 +59,14 @@ class FileCleanerMockedTest { static final ObjectKeyCreator OBJECT_KEY_CREATOR = ObjectKey.creator("", false); + @Test + void close() throws Exception { + final var cleaner = new FileCleaner(time, controlPlane, storageBackend, OBJECT_KEY_CREATOR, RETENTION_PERIOD); + cleaner.close(); + // storage is shared across background jobs and is managed by SharedState. + verify(storageBackend, never()).close(); + } + @Test void empty() throws Exception { final var cleaner = new FileCleaner(time, controlPlane, storageBackend, OBJECT_KEY_CREATOR, RETENTION_PERIOD); diff --git a/storage/inkless/src/test/java/io/aiven/inkless/merge/FileMergerIntegrationTest.java b/storage/inkless/src/test/java/io/aiven/inkless/merge/FileMergerIntegrationTest.java index bafd7b95ec..e0d526dcec 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/merge/FileMergerIntegrationTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/merge/FileMergerIntegrationTest.java @@ -39,7 +39,6 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.extension.ExtendWith; -import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import org.mockito.Mock; @@ -154,8 +153,6 @@ static void tearDownS3() { ControlPlane controlPlane; SharedState sharedState; - @TempDir - Path logDir; @BeforeEach void setup() { @@ -271,10 +268,9 @@ private void writeRecords(final AppendHandler appendHandler) { } CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).orTimeout(60, TimeUnit.SECONDS); - futures.forEach(response -> { - response.join() - .forEach((tp, partitionResponse) -> assertThat(partitionResponse.error).isEqualTo(Errors.NONE)); - }); + futures.forEach(response -> + response.join().forEach((tp, partitionResponse) -> + assertThat(partitionResponse.error).isEqualTo(Errors.NONE))); } private Map getHighWatermarks(final ControlPlane controlPlane) { diff --git a/storage/inkless/src/test/java/io/aiven/inkless/merge/FileMergerMockedTest.java b/storage/inkless/src/test/java/io/aiven/inkless/merge/FileMergerMockedTest.java index 436a522b48..fa13cb2d94 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/merge/FileMergerMockedTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/merge/FileMergerMockedTest.java @@ -21,10 +21,8 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.storage.log.metrics.BrokerTopicStats; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; @@ -46,12 +44,11 @@ import java.time.Duration; import java.time.Instant; import java.util.List; -import java.util.function.Supplier; import io.aiven.inkless.common.ObjectFormat; import io.aiven.inkless.common.ObjectKey; +import io.aiven.inkless.common.ObjectKeyCreator; import io.aiven.inkless.common.PlainObjectKey; -import io.aiven.inkless.common.SharedState; import io.aiven.inkless.config.InklessConfig; import io.aiven.inkless.control_plane.BatchInfo; import io.aiven.inkless.control_plane.BatchMetadata; @@ -59,7 +56,6 @@ import io.aiven.inkless.control_plane.ControlPlaneException; import io.aiven.inkless.control_plane.FileMergeWorkItem; import io.aiven.inkless.control_plane.MergedFileBatch; -import io.aiven.inkless.control_plane.MetadataView; import io.aiven.inkless.storage_backend.common.StorageBackend; import io.aiven.inkless.storage_backend.common.StorageBackendException; @@ -96,6 +92,7 @@ class FileMergerMockedTest { static final TopicIdPartition T1P0 = new TopicIdPartition(TOPIC_ID_1, 0, TOPIC_1); static final TopicIdPartition T1P1 = new TopicIdPartition(TOPIC_ID_1, 1, TOPIC_1); public static final Path WORK_DIR = Path.of("/tmp/inkless/file-merge"); + public static final ObjectKeyCreator OBJECT_KEY_CREATOR = ObjectKey.creator("prefix", false); @Mock Time time; @@ -110,28 +107,23 @@ class FileMergerMockedTest { @Captor ArgumentCaptor sleepCaptor; - SharedState sharedState; - - @BeforeEach - void setup() { - when(inklessConfig.objectKeyPrefix()).thenReturn("prefix"); - when(inklessConfig.fileMergeWorkDir()).thenReturn(WORK_DIR); - when(inklessConfig.cacheMaxCount()).thenReturn(10000L); - - sharedState = SharedState.initialize(time, BROKER_ID, inklessConfig, mock(MetadataView.class), controlPlane, - mock(BrokerTopicStats.class), mock(Supplier.class)); - } - @AfterEach void tearDown() { assertThat(WORK_DIR).isEmptyDirectory(); } + @Test + void close() throws Exception { + final FileMerger fileMerger = new FileMerger(time, inklessConfig, controlPlane, storage, OBJECT_KEY_CREATOR, BROKER_ID, WORK_DIR); + fileMerger.close(); + // storage is shared across background jobs and is managed by SharedState. + verify(storage, never()).close(); + } + @Test void singleFileSingleBatch() throws StorageBackendException, IOException { when(inklessConfig.produceMaxUploadAttempts()).thenReturn(1); when(inklessConfig.produceUploadBackoff()).thenReturn(Duration.ZERO); - when(inklessConfig.storage(any())).thenReturn(storage); final String obj1 = "obj1"; @@ -167,7 +159,7 @@ void singleFileSingleBatch() throws StorageBackendException, IOException { new FileMergeWorkItem(WORK_ITEM_ID, Instant.ofEpochMilli(1234), List.of(file1InWorkItem)) ); - final FileMerger fileMerger = new FileMerger(sharedState); + final FileMerger fileMerger = new FileMerger(time, inklessConfig, controlPlane, storage, OBJECT_KEY_CREATOR, BROKER_ID, WORK_DIR); fileMerger.run(); verify(storage).fetch(PlainObjectKey.create("", obj1), null); @@ -187,7 +179,6 @@ void singleFileSingleBatch() throws StorageBackendException, IOException { void twoFilesWithGaps(final boolean directFileOrder, final boolean directBatchOrder) throws StorageBackendException, IOException { when(inklessConfig.produceMaxUploadAttempts()).thenReturn(1); when(inklessConfig.produceUploadBackoff()).thenReturn(Duration.ZERO); - when(inklessConfig.storage(any())).thenReturn(storage); final String obj1 = "obj1"; final String obj2 = "obj2"; @@ -291,7 +282,7 @@ void twoFilesWithGaps(final boolean directFileOrder, final boolean directBatchOr new FileMergeWorkItem(WORK_ITEM_ID, Instant.ofEpochMilli(1234), files) ); - final FileMerger fileMerger = new FileMerger(sharedState); + final FileMerger fileMerger = new FileMerger(time, inklessConfig, controlPlane, storage, OBJECT_KEY_CREATOR, BROKER_ID, WORK_DIR); fileMerger.run(); verify(storage).fetch(PlainObjectKey.create("", obj1), null); @@ -307,7 +298,7 @@ void twoFilesWithGaps(final boolean directFileOrder, final boolean directBatchOr void mustSleepWhenNoWorkItem() { when(controlPlane.getFileMergeWorkItem()).thenReturn(null); - final FileMerger fileMerger = new FileMerger(sharedState); + final FileMerger fileMerger = new FileMerger(time, inklessConfig, controlPlane, storage, OBJECT_KEY_CREATOR, BROKER_ID, WORK_DIR); fileMerger.run(); verify(time).sleep(sleepCaptor.capture()); assertThat(sleepCaptor.getValue()).isBetween((long) (10_000L * 0.8), (long) (20_000L * 1.2)); @@ -317,8 +308,6 @@ void mustSleepWhenNoWorkItem() { @Test void errorInReading() throws Exception { - when(inklessConfig.storage(any())).thenReturn(storage); - final String obj1 = "obj1"; final long batch1Id = 1; when(storage.fetch(any(ObjectKey.class), isNull())) @@ -331,7 +320,7 @@ void errorInReading() throws Exception { new FileMergeWorkItem(WORK_ITEM_ID, Instant.ofEpochMilli(1234), List.of(file1InWorkItem)) ); - final FileMerger fileMerger = new FileMerger(sharedState); + final FileMerger fileMerger = new FileMerger(time, inklessConfig, controlPlane, storage, OBJECT_KEY_CREATOR, BROKER_ID, WORK_DIR); fileMerger.run(); verify(controlPlane).releaseFileMergeWorkItem(eq(WORK_ITEM_ID)); @@ -345,7 +334,6 @@ void errorInReading() throws Exception { @Test void errorInWriting() throws Exception { - when(inklessConfig.storage(any())).thenReturn(storage); when(inklessConfig.produceMaxUploadAttempts()).thenReturn(1); when(inklessConfig.produceUploadBackoff()).thenReturn(Duration.ZERO); @@ -373,7 +361,7 @@ void errorInWriting() throws Exception { new FileMergeWorkItem(WORK_ITEM_ID, Instant.ofEpochMilli(1234), List.of(file1InWorkItem)) ); - final FileMerger fileMerger = new FileMerger(sharedState); + final FileMerger fileMerger = new FileMerger(time, inklessConfig, controlPlane, storage, OBJECT_KEY_CREATOR, BROKER_ID, WORK_DIR); fileMerger.run(); verify(controlPlane).releaseFileMergeWorkItem(eq(WORK_ITEM_ID)); @@ -387,7 +375,6 @@ void errorInWriting() throws Exception { @ParameterizedTest @ValueSource(booleans = {true, false}) void errorInCommittingFromControlPlane(boolean isSafeToDelete) throws Exception { - when(inklessConfig.storage(any())).thenReturn(storage); when(inklessConfig.produceMaxUploadAttempts()).thenReturn(1); when(inklessConfig.produceUploadBackoff()).thenReturn(Duration.ZERO); @@ -418,7 +405,7 @@ void errorInCommittingFromControlPlane(boolean isSafeToDelete) throws Exception .when(controlPlane).commitFileMergeWorkItem(anyLong(), anyString(), any(), anyInt(), anyLong(), any()); when(controlPlane.isSafeToDeleteFile(anyString())).thenReturn(isSafeToDelete); - final FileMerger fileMerger = new FileMerger(sharedState); + final FileMerger fileMerger = new FileMerger(time, inklessConfig, controlPlane, storage, OBJECT_KEY_CREATOR, BROKER_ID, WORK_DIR); fileMerger.run(); verify(controlPlane).releaseFileMergeWorkItem(eq(WORK_ITEM_ID)); @@ -431,7 +418,6 @@ void errorInCommittingFromControlPlane(boolean isSafeToDelete) throws Exception @Test void errorInCommittingNotFromControlPlane() throws Exception { - when(inklessConfig.storage(any())).thenReturn(storage); when(inklessConfig.produceMaxUploadAttempts()).thenReturn(1); when(inklessConfig.produceUploadBackoff()).thenReturn(Duration.ZERO); @@ -460,7 +446,7 @@ void errorInCommittingNotFromControlPlane() throws Exception { doThrow(new RuntimeException("test")) .when(controlPlane).commitFileMergeWorkItem(anyLong(), anyString(), any(), anyInt(), anyLong(), any()); - final FileMerger fileMerger = new FileMerger(sharedState); + final FileMerger fileMerger = new FileMerger(time, inklessConfig, controlPlane, storage, OBJECT_KEY_CREATOR, BROKER_ID, WORK_DIR); fileMerger.run(); verify(controlPlane).releaseFileMergeWorkItem(eq(WORK_ITEM_ID)); @@ -481,5 +467,4 @@ private byte[] concat(final byte[] ... arrays) { throw new RuntimeException(e); } } - } diff --git a/storage/inkless/src/test/java/io/aiven/inkless/produce/AppendHandlerTest.java b/storage/inkless/src/test/java/io/aiven/inkless/produce/AppendHandlerTest.java index 6fda9ad1e8..e1dc6bfbe2 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/produce/AppendHandlerTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/produce/AppendHandlerTest.java @@ -40,20 +40,11 @@ import org.mockito.quality.Strictness; import java.io.IOException; -import java.time.Duration; import java.util.Map; import java.util.Properties; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; -import io.aiven.inkless.cache.BatchCoordinateCache; -import io.aiven.inkless.cache.CaffeineBatchCoordinateCache; -import io.aiven.inkless.cache.FixedBlockAlignment; -import io.aiven.inkless.cache.KeyAlignmentStrategy; -import io.aiven.inkless.cache.NullCache; -import io.aiven.inkless.cache.ObjectCache; -import io.aiven.inkless.common.ObjectKey; -import io.aiven.inkless.common.ObjectKeyCreator; import io.aiven.inkless.common.SharedState; import io.aiven.inkless.config.InklessConfig; import io.aiven.inkless.control_plane.ControlPlane; @@ -71,10 +62,6 @@ @MockitoSettings(strictness = Strictness.STRICT_STUBS) public class AppendHandlerTest { static final int BROKER_ID = 11; - static final ObjectKeyCreator OBJECT_KEY_CREATOR = ObjectKey.creator("", false); - private static final KeyAlignmentStrategy KEY_ALIGNMENT_STRATEGY = new FixedBlockAlignment(Integer.MAX_VALUE); - private static final ObjectCache OBJECT_CACHE = new NullCache(); - private static final BatchCoordinateCache BATCH_COORDINATE_CACHE = new CaffeineBatchCoordinateCache(Duration.ofSeconds(30)); static final Supplier DEFAULT_TOPIC_CONFIGS = () -> new LogConfig(Map.of()); @@ -113,9 +100,11 @@ public class AppendHandlerTest { @Test public void rejectTransactionalProduce() throws Exception { - try (final AppendHandler interceptor = new AppendHandler( - new SharedState(time, BROKER_ID, inklessConfig, metadataView, controlPlane, - OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS), writer)) { + try ( + final SharedState sharedState = SharedState.initialize(time, BROKER_ID, inklessConfig, metadataView, controlPlane, + brokerTopicStats, DEFAULT_TOPIC_CONFIGS); + final AppendHandler interceptor = new AppendHandler(sharedState, writer) + ) { final TopicIdPartition topicIdPartition1 = new TopicIdPartition(Uuid.randomUuid(), 0, "inkless1"); final TopicIdPartition topicIdPartition2 = new TopicIdPartition(Uuid.randomUuid(), 0, "inkless2"); @@ -137,9 +126,11 @@ topicIdPartition2, new PartitionResponse(Errors.INVALID_REQUEST) @Test public void emptyRequests() throws Exception { - try (final AppendHandler interceptor = new AppendHandler( - new SharedState(time, BROKER_ID, inklessConfig, metadataView, controlPlane, - OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS), writer)) { + try ( + final SharedState sharedState = SharedState.initialize(time, BROKER_ID, inklessConfig, metadataView, controlPlane, + brokerTopicStats, DEFAULT_TOPIC_CONFIGS); + final AppendHandler interceptor = new AppendHandler(sharedState, writer) + ) { final Map entriesPerPartition = Map.of(); @@ -165,9 +156,11 @@ topicIdPartition, new PartitionResponse(Errors.NONE) ); when(metadataView.getTopicConfig(any())).thenReturn(new Properties()); - try (final AppendHandler interceptor = new AppendHandler( - new SharedState(time, BROKER_ID, inklessConfig, metadataView, controlPlane, - OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS), writer)) { + try ( + final SharedState sharedState = SharedState.initialize(time, BROKER_ID, inklessConfig, metadataView, controlPlane, + brokerTopicStats, DEFAULT_TOPIC_CONFIGS); + final AppendHandler interceptor = new AppendHandler(sharedState, writer) + ) { final var result = interceptor.handle(entriesPerPartition, requestLocal).get(); assertThat(result).isEqualTo(writeResult); @@ -188,21 +181,24 @@ public void writeFutureFailed() throws Exception { ); when(metadataView.getTopicConfig(any())).thenReturn(new Properties()); - try (final AppendHandler interceptor = new AppendHandler( - new SharedState(time, BROKER_ID, inklessConfig, metadataView, controlPlane, - OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS), writer)) { + try ( + final SharedState sharedState = SharedState.initialize(time, BROKER_ID, inklessConfig, metadataView, controlPlane, + brokerTopicStats, DEFAULT_TOPIC_CONFIGS); + final AppendHandler interceptor = new AppendHandler(sharedState, writer) + ) { assertThatThrownBy(() -> interceptor.handle(entriesPerPartition, requestLocal).get()).hasCause(exception); } } @Test public void close() throws IOException { - final AppendHandler interceptor = new AppendHandler( - new SharedState(time, BROKER_ID, inklessConfig, metadataView, controlPlane, - OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS), writer); + try (final SharedState sharedState = SharedState.initialize(time, BROKER_ID, inklessConfig, metadataView, controlPlane, + brokerTopicStats, DEFAULT_TOPIC_CONFIGS)) { + final AppendHandler interceptor = new AppendHandler(sharedState, writer); - interceptor.close(); + interceptor.close(); - verify(writer).close(); + verify(writer).close(); + } } } diff --git a/storage/inkless/src/test/java/io/aiven/inkless/produce/WriterMockedTest.java b/storage/inkless/src/test/java/io/aiven/inkless/produce/WriterMockedTest.java index 1a22decfea..0a765057d1 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/produce/WriterMockedTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/produce/WriterMockedTest.java @@ -110,7 +110,7 @@ void setup() { @Test void tickWithEmptyFile() throws InterruptedException { final Writer writer = new Writer( - time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, storage, fileCommitter, writerMetrics, brokerTopicStats); + time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, fileCommitter, writerMetrics, brokerTopicStats); writer.tick(); @@ -121,7 +121,7 @@ void tickWithEmptyFile() throws InterruptedException { @Test void tickIsScheduledWhenFileIsWrittenTo() { final Writer writer = new Writer( - time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, storage, fileCommitter, writerMetrics, brokerTopicStats); + time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, fileCommitter, writerMetrics, brokerTopicStats); final Map writeRequest = Map.of( T0P0, recordCreator.create(T0P0.topicPartition(), 100) @@ -136,7 +136,7 @@ void committingDueToOverfillWithFirstRequest() throws InterruptedException { when(time.nanoseconds()).thenReturn(10_000_000L); final Writer writer = new Writer( - time, Duration.ofMillis(1), 15908, commitTickScheduler, storage, fileCommitter, writerMetrics, brokerTopicStats); + time, Duration.ofMillis(1), 15908, commitTickScheduler, fileCommitter, writerMetrics, brokerTopicStats); final Map writeRequest = Map.of( T0P0, recordCreator.create(T0P0.topicPartition(), 100), @@ -156,7 +156,7 @@ void committingDueToOverfillWithFirstRequest() throws InterruptedException { @Test void committingDueToOverfillBeforeLastRequest() throws InterruptedException { final Writer writer = new Writer( - time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, storage, fileCommitter, writerMetrics, brokerTopicStats); + time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, fileCommitter, writerMetrics, brokerTopicStats); final Map writeRequest0 = Map.of( T0P0, recordCreator.create(T0P0.topicPartition(), 1), @@ -189,7 +189,7 @@ void committingDueToOverfillBeforeLastRequest() throws InterruptedException { @Test void committingDueToOverfillBeforeAfterLastRequest() throws InterruptedException { final Writer writer = new Writer( - time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, storage, fileCommitter, writerMetrics, brokerTopicStats); + time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, fileCommitter, writerMetrics, brokerTopicStats); final Map writeRequest0 = Map.of( T0P0, recordCreator.create(T0P0.topicPartition(), 1), @@ -220,7 +220,7 @@ void committingDueToOverfillBeforeAfterLastRequest() throws InterruptedException @Test void committingOnTick() throws InterruptedException { final Writer writer = new Writer( - time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, storage, fileCommitter, writerMetrics, brokerTopicStats); + time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, fileCommitter, writerMetrics, brokerTopicStats); final Map writeRequest = Map.of( T0P0, recordCreator.create(T0P0.topicPartition(), 1), @@ -241,7 +241,7 @@ void committingOnTick() throws InterruptedException { @Test void committingDueToClose() throws InterruptedException, IOException { final Writer writer = new Writer( - time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, storage, fileCommitter, writerMetrics, brokerTopicStats); + time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, fileCommitter, writerMetrics, brokerTopicStats); final Map writeRequest = Map.of( T0P0, recordCreator.create(T0P0.topicPartition(), 1), @@ -262,7 +262,7 @@ void committingDueToClose() throws InterruptedException, IOException { @Test void writeAfterRotation() throws InterruptedException { final Writer writer = new Writer( - time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, storage, fileCommitter, writerMetrics, brokerTopicStats); + time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, fileCommitter, writerMetrics, brokerTopicStats); final Map writeRequest0 = Map.of( T0P0, recordCreator.create(T0P0.topicPartition(), 100), @@ -290,7 +290,7 @@ void writeAfterRotation() throws InterruptedException { @Test void close() throws IOException { final Writer writer = new Writer( - time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, storage, fileCommitter, writerMetrics, brokerTopicStats); + time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, fileCommitter, writerMetrics, brokerTopicStats); reset(commitTickScheduler); writer.close(); @@ -302,7 +302,7 @@ void close() throws IOException { @Test void closeAfterClose() throws IOException { final Writer writer = new Writer( - time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, storage, fileCommitter, writerMetrics, brokerTopicStats); + time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, fileCommitter, writerMetrics, brokerTopicStats); writer.close(); reset(commitTickScheduler); @@ -319,7 +319,7 @@ void closeAfterClose() throws IOException { @Test void tickAfterClose() throws IOException { final Writer writer = new Writer( - time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, storage, fileCommitter, writerMetrics, brokerTopicStats); + time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, fileCommitter, writerMetrics, brokerTopicStats); writer.close(); reset(commitTickScheduler); @@ -336,7 +336,7 @@ void tickAfterClose() throws IOException { @Test void writeAfterClose() throws IOException { final Writer writer = new Writer( - time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, storage, fileCommitter, writerMetrics, brokerTopicStats); + time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, fileCommitter, writerMetrics, brokerTopicStats); writer.close(); reset(commitTickScheduler); reset(fileCommitter); @@ -358,7 +358,7 @@ void writeAfterClose() throws IOException { @Test void commitInterrupted() throws InterruptedException, IOException { final Writer writer = new Writer( - time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, storage, fileCommitter, writerMetrics, brokerTopicStats); + time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, fileCommitter, writerMetrics, brokerTopicStats); final InterruptedException interruptedException = new InterruptedException(); doThrow(interruptedException).when(fileCommitter).commit(any()); @@ -381,35 +381,32 @@ void commitInterrupted() throws InterruptedException, IOException { @Test void constructorInvalidArguments() { assertThatThrownBy(() -> new Writer( - null, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, storage, fileCommitter, writerMetrics, brokerTopicStats)) + null, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, fileCommitter, writerMetrics, brokerTopicStats)) .isInstanceOf(NullPointerException.class) .hasMessage("time cannot be null"); assertThatThrownBy(() -> new Writer( - time, null, 8 * 1024, commitTickScheduler, storage, fileCommitter, writerMetrics, brokerTopicStats)) + time, null, 8 * 1024, commitTickScheduler, fileCommitter, writerMetrics, brokerTopicStats)) .isInstanceOf(NullPointerException.class) .hasMessage("commitInterval cannot be null"); assertThatThrownBy(() -> new Writer( - time, Duration.ofMillis(1), 0, commitTickScheduler, storage, fileCommitter, writerMetrics, brokerTopicStats)) + time, Duration.ofMillis(1), 0, commitTickScheduler, fileCommitter, writerMetrics, brokerTopicStats)) .isInstanceOf(IllegalArgumentException.class) .hasMessage("maxBufferSize must be positive"); assertThatThrownBy(() -> - new Writer(time, Duration.ofMillis(1), 8 * 1024, null, storage, fileCommitter, writerMetrics, brokerTopicStats)) + new Writer(time, Duration.ofMillis(1), 8 * 1024, null, fileCommitter, writerMetrics, brokerTopicStats)) .isInstanceOf(NullPointerException.class) .hasMessage("commitTickScheduler cannot be null"); - assertThatThrownBy(() -> new Writer(time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, storage, fileCommitter, null, brokerTopicStats)) + assertThatThrownBy(() -> new Writer(time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, fileCommitter, null, brokerTopicStats)) .isInstanceOf(NullPointerException.class) .hasMessage("writerMetrics cannot be null"); - assertThatThrownBy(() -> new Writer(time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, storage, null, writerMetrics, brokerTopicStats)) + assertThatThrownBy(() -> new Writer(time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, null, writerMetrics, brokerTopicStats)) .isInstanceOf(NullPointerException.class) .hasMessage("fileCommitter cannot be null"); - assertThatThrownBy(() -> new Writer(time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, null, fileCommitter, writerMetrics, brokerTopicStats)) - .isInstanceOf(NullPointerException.class) - .hasMessage("storage cannot be null"); } @Test void writeNull() { - final Writer writer = new Writer(time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, storage, fileCommitter, writerMetrics, brokerTopicStats); + final Writer writer = new Writer(time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, fileCommitter, writerMetrics, brokerTopicStats); assertThatThrownBy(() -> writer.write(null, TOPIC_CONFIGS, REQUEST_LOCAL)) .isInstanceOf(NullPointerException.class) @@ -424,7 +421,7 @@ void writeNull() { @Test void writeEmptyRequests() { - final Writer writer = new Writer(time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, storage, fileCommitter, writerMetrics, brokerTopicStats); + final Writer writer = new Writer(time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, fileCommitter, writerMetrics, brokerTopicStats); assertThatThrownBy(() -> writer.write(Map.of(), TOPIC_CONFIGS, REQUEST_LOCAL)) .isInstanceOf(IllegalArgumentException.class) @@ -433,7 +430,7 @@ void writeEmptyRequests() { @Test void entriesTopicConfigMismatch() { - final Writer writer = new Writer(time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, storage, fileCommitter, writerMetrics, brokerTopicStats); + final Writer writer = new Writer(time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, fileCommitter, writerMetrics, brokerTopicStats); assertThatThrownBy(() -> writer.write(Map.of(T0P0, MemoryRecords.withRecords(Compression.NONE, new SimpleRecord(new byte[10]))), Map.of(TOPIC_1, new LogConfig(Map.of())), REQUEST_LOCAL)) .isInstanceOf(IllegalArgumentException.class) diff --git a/storage/inkless/src/test/java/io/aiven/inkless/produce/WriterPropertyTest.java b/storage/inkless/src/test/java/io/aiven/inkless/produce/WriterPropertyTest.java index e273fbbad8..aa65232fe5 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/produce/WriterPropertyTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/produce/WriterPropertyTest.java @@ -248,7 +248,6 @@ void test(final int requestCount, Duration.ofMillis(commitIntervalMsAvg), // it doesn't matter as the scheduling doesn't happen maxBufferSize, mock(ScheduledExecutorService.class), - storage, fileCommitter, mock(WriterMetrics.class), new BrokerTopicStats() diff --git a/storage/inkless/src/test/java/io/aiven/inkless/storage_backend/in_memory/InMemoryStorageTest.java b/storage/inkless/src/test/java/io/aiven/inkless/storage_backend/in_memory/InMemoryStorageTest.java index dd9a0a496b..dceab46204 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/storage_backend/in_memory/InMemoryStorageTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/storage_backend/in_memory/InMemoryStorageTest.java @@ -17,6 +17,8 @@ */ package io.aiven.inkless.storage_backend.in_memory; +import org.apache.kafka.common.metrics.Metrics; + import org.junit.jupiter.api.Test; import java.io.ByteArrayInputStream; @@ -36,10 +38,9 @@ class InMemoryStorageTest { static final PlainObjectKey OBJECT_KEY = PlainObjectKey.create("a", "b"); - @Test void fetchNulls() { - final InMemoryStorage storage = new InMemoryStorage(); + final InMemoryStorage storage = new InMemoryStorage(new Metrics()); assertThatThrownBy(() -> storage.fetch(null, new ByteRange(0, 10))) .isInstanceOf(NullPointerException.class) .hasMessage("key cannot be null"); @@ -50,7 +51,7 @@ void fetchNulls() { @Test void deleteNulls() { - final InMemoryStorage storage = new InMemoryStorage(); + final InMemoryStorage storage = new InMemoryStorage(new Metrics()); assertThatThrownBy(() -> storage.delete((ObjectKey) null)) .isInstanceOf(NullPointerException.class) .hasMessage("key cannot be null"); @@ -61,14 +62,14 @@ void deleteNulls() { @Test void fetchNonExistent() { - final InMemoryStorage storage = new InMemoryStorage(); + final InMemoryStorage storage = new InMemoryStorage(new Metrics()); assertThatThrownBy(() -> storage.fetch(OBJECT_KEY, ByteRange.maxRange())) .isInstanceOf(KeyNotFoundException.class); } @Test void uploadAndFetch() throws StorageBackendException, IOException { - final InMemoryStorage storage = new InMemoryStorage(); + final InMemoryStorage storage = new InMemoryStorage(new Metrics()); final byte[] data = new byte[10]; storage.upload(OBJECT_KEY, new ByteArrayInputStream(data), data.length); @@ -79,7 +80,7 @@ void uploadAndFetch() throws StorageBackendException, IOException { @Test void fetchRanged() throws StorageBackendException, IOException { - final InMemoryStorage storage = new InMemoryStorage(); + final InMemoryStorage storage = new InMemoryStorage(new Metrics()); final byte[] data = new byte[]{0, 1, 2, 3, 4, 5, 6, 7}; storage.upload(OBJECT_KEY, new ByteArrayInputStream(data), data.length); @@ -92,7 +93,7 @@ void fetchRanged() throws StorageBackendException, IOException { @Test void fetchOutsideOfSize() throws StorageBackendException { - final InMemoryStorage storage = new InMemoryStorage(); + final InMemoryStorage storage = new InMemoryStorage(new Metrics()); final byte[] data = new byte[]{0, 1, 2, 3, 4, 5, 6, 7}; storage.upload(OBJECT_KEY, new ByteArrayInputStream(data), data.length); @@ -103,7 +104,7 @@ void fetchOutsideOfSize() throws StorageBackendException { @Test void delete() throws StorageBackendException, IOException { - final InMemoryStorage storage = new InMemoryStorage(); + final InMemoryStorage storage = new InMemoryStorage(new Metrics()); final byte[] data = new byte[]{0, 1, 2, 3, 4, 5, 6, 7}; storage.upload(OBJECT_KEY, new ByteArrayInputStream(data), data.length); @@ -118,7 +119,7 @@ void delete() throws StorageBackendException, IOException { @Test void deleteMany() throws StorageBackendException, IOException { - final InMemoryStorage storage = new InMemoryStorage(); + final InMemoryStorage storage = new InMemoryStorage(new Metrics()); final byte[] data = new byte[]{0, 1, 2, 3, 4, 5, 6, 7}; storage.upload(OBJECT_KEY, new ByteArrayInputStream(data), data.length);