diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 3e6dcc78cc..a736834c99 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -91,7 +91,7 @@ import org.mockito.ArgumentMatchers._ import org.mockito.Mockito._ import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer -import org.mockito.{Answers, ArgumentCaptor, ArgumentMatchers, MockedConstruction} +import org.mockito.{ArgumentCaptor, ArgumentMatchers, MockedConstruction} import java.io.{ByteArrayInputStream, File} import java.net.InetAddress @@ -6495,7 +6495,7 @@ class ReplicaManagerTest { } @Test - def testAppendWithInvalidDisklessAndValidCLassic(): Unit = { + def testAppendWithInvalidDisklessAndValidClassic(): Unit = { val entriesPerPartition = Map( disklessTopicPartition -> RECORDS, classicTopicPartition -> RECORDS, @@ -6643,7 +6643,7 @@ class ReplicaManagerTest { val replicaManager = try { createReplicaManager( List(disklessTopicPartition.topic(), disklessTopicPartition2.topic()), - controlPlane = Some(cp), + controlPlaneOption = Some(cp), topicIdMapping = Map(disklessTopicPartition2.topic() -> disklessTopicPartition2.topicId()) ) } finally { @@ -6706,7 +6706,7 @@ class ReplicaManagerTest { val fetchHandlerCtor: MockedConstruction[FetchHandler] = mockFetchHandler(disklessResponse) val replicaManager = try { - createReplicaManager(List(disklessTopicPartition.topic()), controlPlane = Some(cp)) + createReplicaManager(List(disklessTopicPartition.topic()), controlPlaneOption = Some(cp)) } finally { fetchHandlerCtor.close() } @@ -6765,7 +6765,7 @@ class ReplicaManagerTest { val replicaManager = try { // spy to inject readFromLog mock - spy(createReplicaManager(List(disklessTopicPartition.topic()), controlPlane = Some(cp))) + spy(createReplicaManager(List(disklessTopicPartition.topic()), controlPlaneOption = Some(cp))) } finally { fetchHandlerCtor.close() } @@ -6845,7 +6845,7 @@ class ReplicaManagerTest { val replicaManager = try { // spy to inject readFromLog mock - spy(createReplicaManager(List(disklessTopicPartition.topic()), controlPlane = Some(cp))) + spy(createReplicaManager(List(disklessTopicPartition.topic()), controlPlaneOption = Some(cp))) } finally { fetchHandlerCtor.close() } @@ -6919,7 +6919,7 @@ class ReplicaManagerTest { val fetchHandlerCtor: MockedConstruction[FetchHandler] = mockFetchHandler(disklessResponse) val replicaManager = try { - createReplicaManager(List(disklessTopicPartition.topic()), controlPlane = Some(cp)) + createReplicaManager(List(disklessTopicPartition.topic()), controlPlaneOption = Some(cp)) } finally { fetchHandlerCtor.close() } @@ -6982,7 +6982,7 @@ class ReplicaManagerTest { val fetchHandlerCtor: MockedConstruction[FetchHandler] = mockFetchHandler(disklessResponse) val replicaManager = try { - spy(createReplicaManager(List(disklessTopicPartition.topic()), controlPlane = Some(cp))) + spy(createReplicaManager(List(disklessTopicPartition.topic()), controlPlaneOption = Some(cp))) } finally { fetchHandlerCtor.close() } @@ -7057,7 +7057,7 @@ class ReplicaManagerTest { val fetchHandlerCtor: MockedConstruction[FetchHandler] = mockFetchHandler(disklessResponse) val replicaManager = try { - spy(createReplicaManager(List(disklessTopicPartition.topic()), controlPlane = Some(cp))) + spy(createReplicaManager(List(disklessTopicPartition.topic()), controlPlaneOption = Some(cp))) } finally { fetchHandlerCtor.close() } @@ -7130,16 +7130,17 @@ class ReplicaManagerTest { private def createReplicaManager( disklessTopics: Seq[String], - controlPlane: Option[ControlPlane] = None, + controlPlaneOption: Option[ControlPlane] = None, topicIdMapping: Map[String, Uuid] = Map.empty ): ReplicaManager = { - val props = TestUtils.createBrokerConfig(1, logDirCount = 2) + val brokerId = 1 + val props = TestUtils.createBrokerConfig(brokerId, logDirCount = 2) val config = KafkaConfig.fromProps(props) val mockLogMgr = TestUtils.createLogManager(config.logDirs.asScala.map(new File(_)), new LogConfig(new Properties())) - val sharedState = mock(classOf[SharedState], Answers.RETURNS_DEEP_STUBS) - when(sharedState.time()).thenReturn(Time.SYSTEM) - when(sharedState.config()).thenReturn(new InklessConfig(new util.HashMap[String, Object]())) - when(sharedState.controlPlane()).thenReturn(controlPlane.getOrElse(mock(classOf[ControlPlane]))) + val inklessConfigProps = new util.HashMap[String, Object]() + inklessConfigProps.put(InklessConfig.CONSUME_BATCH_COORDINATE_CACHE_ENABLED_CONFIG, java.lang.Boolean.FALSE) + val inklessConfig = new InklessConfig(inklessConfigProps) + val controlPlane = controlPlaneOption.getOrElse(mock(classOf[ControlPlane])) val inklessMetadata = mock(classOf[MetadataView]) when(inklessMetadata.isDisklessTopic(any())).thenReturn(false) when(inklessMetadata.getTopicId(anyString())).thenAnswer{ invocation => @@ -7147,7 +7148,7 @@ class ReplicaManagerTest { topicIdMapping.getOrElse(topicName, Uuid.ZERO_UUID) } disklessTopics.foreach(t => when(inklessMetadata.isDisklessTopic(t)).thenReturn(true)) - when(sharedState.metadata()).thenReturn(inklessMetadata) + val sharedState = SharedState.initialize(time, brokerId, inklessConfig, inklessMetadata, controlPlane, new BrokerTopicStats(), () => new LogConfig(new Properties())) val logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size) diff --git a/storage/inkless/src/main/java/io/aiven/inkless/common/SharedState.java b/storage/inkless/src/main/java/io/aiven/inkless/common/SharedState.java index aa817b2bd4..76bf46c83a 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/common/SharedState.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/common/SharedState.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.MetricsReporter; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.storage.internals.log.LogConfig; import org.apache.kafka.storage.log.metrics.BrokerTopicStats; @@ -30,6 +31,7 @@ import java.io.IOException; import java.time.Duration; import java.util.List; +import java.util.Optional; import java.util.function.Supplier; import io.aiven.inkless.cache.BatchCoordinateCache; @@ -52,6 +54,18 @@ public final class SharedState implements Closeable { private final InklessConfig config; private final MetadataView metadata; private final ControlPlane controlPlane; + private final StorageBackend fetchStorage; + // Separate storage client for lagging consumers to: + // 1. Isolate connection pool usage (lagging consumers shouldn't exhaust connections for hot path) + // 2. Allow independent tuning of timeouts/retries for cold storage access patterns + // (This requires some refactoring on how the storage client is built/configured) + private final Optional maybeLaggingFetchStorage; + private final StorageBackend produceStorage; + // backgroundStorage is shared by FileCleaner and FileMerger executors which run concurrently. + // Kafka storage backends are required to be thread-safe (they share the same Metrics instance). + // However, these tasks perform high-latency object storage calls and retries. A dedicated backend + // instance guarantees they don't contend with hot-path fetch/produce clients and prevents threading/double-close issues. + private final StorageBackend backgroundStorage; private final ObjectKeyCreator objectKeyCreator; private final KeyAlignmentStrategy keyAlignmentStrategy; private final ObjectCache cache; @@ -60,12 +74,17 @@ public final class SharedState implements Closeable { private final Supplier defaultTopicConfigs; private final Metrics storageMetrics; - public SharedState( + private SharedState( final Time time, final int brokerId, final InklessConfig config, final MetadataView metadata, final ControlPlane controlPlane, + final StorageBackend fetchStorage, + final Optional maybeLaggingFetchStorage, + final StorageBackend produceStorage, + final StorageBackend backgroundStorage, + final Metrics storageMetrics, final ObjectKeyCreator objectKeyCreator, final KeyAlignmentStrategy keyAlignmentStrategy, final ObjectCache cache, @@ -84,12 +103,11 @@ public SharedState( this.batchCoordinateCache = batchCoordinateCache; this.brokerTopicStats = brokerTopicStats; this.defaultTopicConfigs = defaultTopicConfigs; - - final MetricsReporter reporter = new JmxReporter(); - this.storageMetrics = new Metrics( - new MetricConfig(), List.of(reporter), Time.SYSTEM, - new KafkaMetricsContext(STORAGE_METRIC_CONTEXT) - ); + this.fetchStorage = fetchStorage; + this.maybeLaggingFetchStorage = maybeLaggingFetchStorage; + this.produceStorage = produceStorage; + this.backgroundStorage = backgroundStorage; + this.storageMetrics = storageMetrics; } public static SharedState initialize( @@ -107,34 +125,91 @@ public static SharedState initialize( "Value of consume.batch.coordinate.cache.ttl.ms exceeds file.cleaner.retention.period.ms / 2" ); } - return new SharedState( - time, - brokerId, - config, - metadata, - controlPlane, - ObjectKey.creator(config.objectKeyPrefix(), config.objectKeyLogPrefixMasked()), - new FixedBlockAlignment(config.fetchCacheBlockBytes()), - new CaffeineCache( + + CaffeineCache objectCache = null; + BatchCoordinateCache batchCoordinateCache = null; + StorageBackend fetchStorage = null; + StorageBackend laggingFetchStorage = null; + StorageBackend produceStorage = null; + StorageBackend backgroundStorage = null; + Metrics storageMetrics = null; + try { + objectCache = new CaffeineCache( config.cacheMaxCount(), config.cacheExpirationLifespanSec(), config.cacheExpirationMaxIdleSec() - ), - config.isBatchCoordinateCacheEnabled() ? new CaffeineBatchCoordinateCache(config.batchCoordinateCacheTtl()) : new NullBatchCoordinateCache(), - brokerTopicStats, - defaultTopicConfigs - ); + ); + batchCoordinateCache = config.isBatchCoordinateCacheEnabled() + ? new CaffeineBatchCoordinateCache(config.batchCoordinateCacheTtl()) + : new NullBatchCoordinateCache(); + + final MetricsReporter reporter = new JmxReporter(); + storageMetrics = new Metrics( + new MetricConfig(), List.of(reporter), Time.SYSTEM, + new KafkaMetricsContext(STORAGE_METRIC_CONTEXT) + ); + fetchStorage = config.storage(storageMetrics); + // If thread pool size is 0, disabling lagging consumer support, don't create a separate client + // + // NOTE: The client for lagging consumers is created only when this SharedState + // is constructed. If fetchLaggingConsumerThreadPoolSize() is 0 at this time, no separate + // client is created and lagging consumer support is effectively disabled for the lifetime + // of this instance, even if the configuration is later reloaded with a non-zero value. + // Enabling lagging consumer support therefore requires a broker restart (or reconstruction + // of the SharedState) so that a new storage client can be created. + laggingFetchStorage = config.fetchLaggingConsumerThreadPoolSize() > 0 ? config.storage(storageMetrics) : null; + produceStorage = config.storage(storageMetrics); + backgroundStorage = config.storage(storageMetrics); + final var objectKeyCreator = ObjectKey.creator(config.objectKeyPrefix(), config.objectKeyLogPrefixMasked()); + final var keyAlignmentStrategy = new FixedBlockAlignment(config.fetchCacheBlockBytes()); + return new SharedState( + time, + brokerId, + config, + metadata, + controlPlane, + fetchStorage, + Optional.ofNullable(laggingFetchStorage), + produceStorage, + backgroundStorage, + storageMetrics, + objectKeyCreator, + keyAlignmentStrategy, + objectCache, + batchCoordinateCache, + brokerTopicStats, + defaultTopicConfigs + ); + } catch (Exception e) { + // Closing storage backends + Utils.closeQuietly(backgroundStorage, "backgroundStorage"); + Utils.closeQuietly(produceStorage, "produceStorage"); + Utils.closeQuietly(fetchStorage, "fetchStorage"); + Utils.closeQuietly(laggingFetchStorage, "laggingFetchStorage"); + // Closing storage metrics + Utils.closeQuietly(storageMetrics, "storageMetrics"); + // Closing caches + Utils.closeQuietly(batchCoordinateCache, "batchCoordinateCache"); + Utils.closeQuietly(objectCache, "objectCache"); + + throw new RuntimeException("Failed to initialize SharedState", e); + } } @Override public void close() throws IOException { - try { - cache.close(); - controlPlane.close(); - storageMetrics.close(); - } catch (Exception e) { - throw new RuntimeException(e); - } + // Closing storage backends + Utils.closeQuietly(backgroundStorage, "backgroundStorage"); + Utils.closeQuietly(produceStorage, "produceStorage"); + maybeLaggingFetchStorage.ifPresent(s -> Utils.closeQuietly(s, "laggingFetchStorage")); + Utils.closeQuietly(fetchStorage, "fetchStorage"); + // Closing storage metrics + Utils.closeQuietly(storageMetrics, "storageMetrics"); + // Closing caches + Utils.closeQuietly(cache, "objectCache"); + Utils.closeQuietly(batchCoordinateCache, "batchCoordinateCache"); + // Closing control plane + Utils.closeQuietly(controlPlane, "controlPlane"); } public Time time() { @@ -185,7 +260,25 @@ public Supplier defaultTopicConfigs() { return defaultTopicConfigs; } - public StorageBackend buildStorage() { - return config.storage(storageMetrics); + public StorageBackend fetchStorage() { + return fetchStorage; + } + + /** + * Optional access to the lagging fetch storage backend. + * + *

When {@code fetch.lagging.consumer.thread.pool.size == 0}, the lagging consumer + * path is explicitly disabled and this storage backend is not created.

+ */ + public Optional maybeLaggingFetchStorage() { + return maybeLaggingFetchStorage; + } + + public StorageBackend produceStorage() { + return produceStorage; + } + + public StorageBackend backgroundStorage() { + return backgroundStorage; } } diff --git a/storage/inkless/src/main/java/io/aiven/inkless/consume/FetchHandler.java b/storage/inkless/src/main/java/io/aiven/inkless/consume/FetchHandler.java index ee1c80c5ce..75caca6476 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/consume/FetchHandler.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/consume/FetchHandler.java @@ -51,23 +51,11 @@ public FetchHandler(final SharedState state) { state.keyAlignmentStrategy(), state.cache(), state.controlPlane(), - state.buildStorage(), + state.fetchStorage(), state.brokerTopicStats(), state.config().fetchMetadataThreadPoolSize(), state.config().fetchDataThreadPoolSize(), - // Separate storage client for lagging consumers to: - // 1. Isolate connection pool usage (lagging consumers shouldn't exhaust connections for hot path) - // 2. Allow independent tuning of timeouts/retries for cold storage access patterns - // (This requires some refactoring on how the storage client is built/configured) - // If thread pool size is 0, disabling lagging consumer support, don't create a separate client - // - // NOTE: The client for lagging consumers is created only when this FetchHandler (and Reader) - // is constructed. If fetchLaggingConsumerThreadPoolSize() is 0 at this time, no separate - // client is created and lagging consumer support is effectively disabled for the lifetime - // of this instance, even if the configuration is later reloaded with a non-zero value. - // Enabling lagging consumer support therefore requires a broker restart (or reconstruction - // of the SharedState/FetchHandler) so that a new storage client can be created. - state.config().fetchLaggingConsumerThreadPoolSize() > 0 ? state.buildStorage() : null, + state.maybeLaggingFetchStorage(), state.config().fetchLaggingConsumerThresholdMs(), state.config().fetchLaggingConsumerRequestRateLimit(), state.config().fetchLaggingConsumerThreadPoolSize(), @@ -76,7 +64,8 @@ public FetchHandler(final SharedState state) { ); } - public FetchHandler(final Reader reader) { + // Visible for testing + FetchHandler(final Reader reader) { this.reader = reader; } diff --git a/storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java b/storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java index 80228c6811..26024f93ec 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java @@ -33,6 +33,7 @@ import java.time.Instant; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -51,6 +52,7 @@ import io.aiven.inkless.control_plane.ControlPlane; import io.aiven.inkless.generated.FileExtent; import io.aiven.inkless.storage_backend.common.ObjectFetcher; +import io.aiven.inkless.storage_backend.common.StorageBackend; import io.github.bucket4j.Bandwidth; import io.github.bucket4j.Bucket; @@ -107,7 +109,7 @@ public Reader( BrokerTopicStats brokerTopicStats, int fetchMetadataThreadPoolSize, int fetchDataThreadPoolSize, - ObjectFetcher laggingObjectFetcher, + Optional maybeLaggingObjectFetcher, long laggingConsumerThresholdMs, int laggingConsumerRequestRateLimit, int laggingConsumerThreadPoolSize, @@ -125,33 +127,34 @@ public Reader( Executors.newFixedThreadPool(fetchDataThreadPoolSize, new InklessThreadFactory("inkless-fetch-data-", false)), // Only create lagging consumer fetcher when feature is enabled (pool size > 0). // A pool size of 0 is a valid configuration that explicitly disables the feature (null fetcher and executor). - laggingConsumerThreadPoolSize > 0 ? laggingObjectFetcher : null, + maybeLaggingObjectFetcher, laggingConsumerThresholdMs, laggingConsumerRequestRateLimit, // Only create lagging consumer resources when feature is enabled (pool size > 0). // A pool size of 0 is a valid configuration that explicitly disables the feature // by passing both a null executor and a null laggingObjectFetcher. - laggingConsumerThreadPoolSize > 0 - ? createBoundedThreadPool(laggingConsumerThreadPoolSize) - : null, + maybeCreateBoundedThreadPool(laggingConsumerThreadPoolSize), new InklessFetchMetrics(time, cache), brokerTopicStats ); } - private static ExecutorService createBoundedThreadPool(int poolSize) { + private static Optional maybeCreateBoundedThreadPool(int poolSize) { // Creates a bounded thread pool for lagging consumer fetch requests. // Fixed pool design: all threads persist for executor lifetime (never removed when idle). + if (poolSize == 0) return Optional.empty(); final int queueCapacity = poolSize * LAGGING_CONSUMER_QUEUE_MULTIPLIER; - return new ThreadPoolExecutor( - poolSize, // corePoolSize: fixed pool, always this many threads - poolSize, // maximumPoolSize: no dynamic scaling (core == max) - 0L, // keepAliveTime: unused for fixed pools (core threads don't time out) - TimeUnit.MILLISECONDS, - new ArrayBlockingQueue<>(queueCapacity), // Bounded queue prevents OOM - new InklessThreadFactory("inkless-fetch-lagging-consumer-", false), - // Why AbortPolicy: CallerRunsPolicy would block request handler threads causing broker-wide degradation - new ThreadPoolExecutor.AbortPolicy() // Reject when full, don't block callers + return Optional.of( + new ThreadPoolExecutor( + poolSize, // corePoolSize: fixed pool, always this many threads + poolSize, // maximumPoolSize: no dynamic scaling (core == max) + 0L, // keepAliveTime: unused for fixed pools (core threads don't time out) + TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(queueCapacity), // Bounded queue prevents OOM + new InklessThreadFactory("inkless-fetch-lagging-consumer-", false), + // Why AbortPolicy: CallerRunsPolicy would block request handler threads causing broker-wide degradation + new ThreadPoolExecutor.AbortPolicy() // Reject when full, don't block callers + ) ); } @@ -166,10 +169,10 @@ private static ExecutorService createBoundedThreadPool(int poolSize) { int maxBatchesPerPartitionToFind, ExecutorService metadataExecutor, ExecutorService fetchDataExecutor, - ObjectFetcher laggingConsumerObjectFetcher, + Optional maybeLaggingConsumerObjectFetcher, long laggingConsumerThresholdMs, int laggingConsumerRequestRateLimit, - ExecutorService laggingFetchDataExecutor, + Optional maybeLaggingFetchDataExecutor, InklessFetchMetrics fetchMetrics, BrokerTopicStats brokerTopicStats ) { @@ -182,25 +185,31 @@ private static ExecutorService createBoundedThreadPool(int poolSize) { this.maxBatchesPerPartitionToFind = maxBatchesPerPartitionToFind; this.metadataExecutor = metadataExecutor; this.fetchDataExecutor = fetchDataExecutor; - this.laggingFetchDataExecutor = laggingFetchDataExecutor; this.laggingConsumerThresholdMs = laggingConsumerThresholdMs; - this.laggingConsumerObjectFetcher = laggingConsumerObjectFetcher; // Validate that lagging consumer resources are consistently configured: // both executor and fetcher must be null (feature disabled) or both must be non-null (feature enabled). // This ensures fail-fast behavior rather than silent runtime failure. - if ((laggingFetchDataExecutor == null) != (laggingConsumerObjectFetcher == null)) { + if (maybeLaggingFetchDataExecutor.isPresent() != maybeLaggingConsumerObjectFetcher.isPresent()) { throw new IllegalArgumentException( "Lagging consumer feature requires both laggingFetchDataExecutor and laggingConsumerObjectFetcher " + "to be non-null (feature enabled) or both to be null (feature disabled). " - + "Found: executor=" + (laggingFetchDataExecutor != null ? "non-null" : "null") - + ", fetcher=" + (laggingConsumerObjectFetcher != null ? "non-null" : "null") + + "Found: executor=" + (maybeLaggingFetchDataExecutor.isPresent() ? "non-null" : "null") + + ", fetcher=" + (maybeLaggingConsumerObjectFetcher.isPresent() ? "non-null" : "null") ); } + // Unwrap Optional to nullable fields. This is safe because: + // 1. Validation above ensures both are consistently present or absent + // 2. Internal code (FetchPlanner) uses efficient null checks: laggingExecutor != null && laggingFetcher != null + // 3. Null checks are already present throughout the codebase for feature detection + // 4. Using Optional in internal fields/parameters would add overhead in the hot fetch path + this.laggingFetchDataExecutor = maybeLaggingFetchDataExecutor.orElse(null); + this.laggingConsumerObjectFetcher = maybeLaggingConsumerObjectFetcher.orElse(null); + // Initialize rate limiter only if lagging consumer feature is enabled (executor exists) and rate limit > 0 // This avoids creating unused objects when the feature is disabled - if (laggingFetchDataExecutor != null && laggingConsumerRequestRateLimit > 0) { + if (this.laggingFetchDataExecutor != null && laggingConsumerRequestRateLimit > 0) { // Rate limiter configuration: // - capacity = rateLimit: Allows initial burst up to full rate limit (e.g., 200 tokens for 200 req/s) // - refillGreedy: Refills at rateLimit tokens per second @@ -423,8 +432,6 @@ public void close() throws IOException { if (metadataThreadPoolMonitor != null) metadataThreadPoolMonitor.close(); if (dataThreadPoolMonitor != null) dataThreadPoolMonitor.close(); if (laggingConsumerThreadPoolMonitor != null) laggingConsumerThreadPoolMonitor.close(); - objectFetcher.close(); - if (laggingConsumerObjectFetcher != null) laggingConsumerObjectFetcher.close(); fetchMetrics.close(); } } diff --git a/storage/inkless/src/main/java/io/aiven/inkless/delete/DeleteRecordsInterceptor.java b/storage/inkless/src/main/java/io/aiven/inkless/delete/DeleteRecordsInterceptor.java index 09635b3801..299403c926 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/delete/DeleteRecordsInterceptor.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/delete/DeleteRecordsInterceptor.java @@ -36,27 +36,31 @@ import io.aiven.inkless.common.SharedState; import io.aiven.inkless.common.TopicIdEnricher; import io.aiven.inkless.common.TopicTypeCounter; +import io.aiven.inkless.control_plane.ControlPlane; import io.aiven.inkless.control_plane.DeleteRecordsRequest; import io.aiven.inkless.control_plane.DeleteRecordsResponse; +import io.aiven.inkless.control_plane.MetadataView; import static org.apache.kafka.common.requests.DeleteRecordsResponse.INVALID_LOW_WATERMARK; public class DeleteRecordsInterceptor { private static final Logger LOGGER = LoggerFactory.getLogger(DeleteRecordsInterceptor.class); - private final SharedState state; + private final ControlPlane controlPlane; + private final MetadataView metadataView; private final Executor executor; private final TopicTypeCounter topicTypeCounter; public DeleteRecordsInterceptor(final SharedState state) { - this(state, Executors.newCachedThreadPool()); + this(state.controlPlane(), state.metadata(), Executors.newCachedThreadPool()); } // Visible for testing. - DeleteRecordsInterceptor(final SharedState state, final Executor executor) { - this.state = state; + DeleteRecordsInterceptor(final ControlPlane controlPlane, final MetadataView metadataView, final Executor executor) { + this.controlPlane = controlPlane; this.executor = executor; - this.topicTypeCounter = new TopicTypeCounter(this.state.metadata()); + this.metadataView = metadataView; + this.topicTypeCounter = new TopicTypeCounter(metadataView); } /** @@ -82,7 +86,7 @@ public boolean intercept(final Map offsetPerPartition, final Map offsetPerPartitionEnriched; try { - offsetPerPartitionEnriched = TopicIdEnricher.enrich(state.metadata(), offsetPerPartition); + offsetPerPartitionEnriched = TopicIdEnricher.enrich(metadataView, offsetPerPartition); } catch (final TopicIdEnricher.TopicIdNotFoundException e) { LOGGER.error("Cannot find UUID for topic {}", e.topicName); respondAllWithError(offsetPerPartition, responseCallback, Errors.UNKNOWN_SERVER_ERROR); @@ -95,7 +99,7 @@ public boolean intercept(final Map offsetPerPartition, final List requests = offsetPerPartitionEnriched.entrySet().stream() .map(kv -> new DeleteRecordsRequest(kv.getKey(), kv.getValue())) .toList(); - final List responses = state.controlPlane().deleteRecords(requests); + final List responses = controlPlane.deleteRecords(requests); final Map result = new HashMap<>(); for (int i = 0; i < responses.size(); i++) { final DeleteRecordsRequest request = requests.get(i); diff --git a/storage/inkless/src/main/java/io/aiven/inkless/delete/FileCleaner.java b/storage/inkless/src/main/java/io/aiven/inkless/delete/FileCleaner.java index 87022ab36f..16af9348ab 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/delete/FileCleaner.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/delete/FileCleaner.java @@ -63,7 +63,7 @@ public FileCleaner(SharedState sharedState) { this( sharedState.time(), sharedState.controlPlane(), - sharedState.buildStorage(), + sharedState.backgroundStorage(), sharedState.objectKeyCreator(), sharedState.config().fileCleanerRetentionPeriod() ); @@ -143,7 +143,7 @@ private void cleanFiles(Set objectKeyPaths) throws StorageBackendExcepti @Override public void close() throws IOException { - storage.close(); + // SharedState owns the storage backend lifecycle; only close component metrics here. metrics.close(); } } diff --git a/storage/inkless/src/main/java/io/aiven/inkless/merge/FileMerger.java b/storage/inkless/src/main/java/io/aiven/inkless/merge/FileMerger.java index 7bce9f39ed..b45bf77746 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/merge/FileMerger.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/merge/FileMerger.java @@ -67,12 +67,31 @@ public class FileMerger implements Runnable, Closeable { private final AtomicInteger attempts = new AtomicInteger(); public FileMerger(final SharedState sharedState) { - this.brokerId = sharedState.brokerId(); - this.time = sharedState.time(); - this.config = sharedState.config(); - this.controlPlane = sharedState.controlPlane(); - this.storage = sharedState.buildStorage(); - this.objectKeyCreator = sharedState.objectKeyCreator(); + this( + sharedState.time(), + sharedState.config(), + sharedState.controlPlane(), + sharedState.backgroundStorage(), + sharedState.objectKeyCreator(), + sharedState.brokerId(), + sharedState.config().fileMergeWorkDir() + ); + } + + // Visible for testing. + FileMerger(final Time time, + final InklessConfig config, + final ControlPlane controlPlane, + final StorageBackend storage, + final ObjectKeyCreator objectKeyCreator, + final int brokerId, + final Path workDir) { + this.time = time; + this.config = config; + this.controlPlane = controlPlane; + this.storage = storage; + this.objectKeyCreator = objectKeyCreator; + this.brokerId = brokerId; this.metrics = new FileMergerMetrics(); // This backoff is needed only for jitter, there's no exponent in it. @@ -80,7 +99,7 @@ public FileMerger(final SharedState sharedState) { final var noWorkBackoff = new ExponentialBackoff(noWorkBackoffDuration, 1, noWorkBackoffDuration * 2, 0.2); noWorkBackoffSupplier = () -> noWorkBackoff.backoff(1); - this.workDir = config.fileMergeWorkDir(); + this.workDir = workDir; } @Override @@ -239,7 +258,7 @@ private void tryDeleteFile(ObjectKey objectKey, Exception e) { @Override public void close() throws IOException { - storage.close(); + // Storage backend is managed by SharedState; avoid double-closing shared clients. metrics.close(); } } diff --git a/storage/inkless/src/main/java/io/aiven/inkless/produce/AppendHandler.java b/storage/inkless/src/main/java/io/aiven/inkless/produce/AppendHandler.java index c54670709e..53afcff96d 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/produce/AppendHandler.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/produce/AppendHandler.java @@ -56,7 +56,7 @@ public AppendHandler(final SharedState state) { state.time(), state.brokerId(), state.objectKeyCreator(), - state.buildStorage(), + state.produceStorage(), state.keyAlignmentStrategy(), state.cache(), state.batchCoordinateCache(), diff --git a/storage/inkless/src/main/java/io/aiven/inkless/produce/Writer.java b/storage/inkless/src/main/java/io/aiven/inkless/produce/Writer.java index 7096393d4d..a85c8d9847 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/produce/Writer.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/produce/Writer.java @@ -70,7 +70,6 @@ class Writer implements Closeable { private final Lock lock = new ReentrantLock(); private ActiveFile activeFile; - private final StorageBackend storage; private final FileCommitter fileCommitter; private final Time time; private final Duration commitInterval; @@ -103,12 +102,19 @@ class Writer implements Closeable { commitInterval, maxBufferSize, Executors.newScheduledThreadPool(1, new InklessThreadFactory("inkless-file-commit-ticker-", true)), - storage, new FileCommitter( - brokerId, controlPlane, objectKeyCreator, storage, - keyAlignmentStrategy, objectCache, batchCoordinateCache, time, - maxFileUploadAttempts, fileUploadRetryBackoff, - fileUploaderThreadPoolSize), + brokerId, + controlPlane, + objectKeyCreator, + storage, + keyAlignmentStrategy, + objectCache, + batchCoordinateCache, + time, + maxFileUploadAttempts, + fileUploadRetryBackoff, + fileUploaderThreadPoolSize + ), new WriterMetrics(time), brokerTopicStats ); @@ -119,7 +125,6 @@ class Writer implements Closeable { final Duration commitInterval, final int maxBufferSize, final ScheduledExecutorService commitTickScheduler, - final StorageBackend storage, final FileCommitter fileCommitter, final WriterMetrics writerMetrics, final BrokerTopicStats brokerTopicStats) { @@ -130,7 +135,6 @@ class Writer implements Closeable { } this.maxBufferSize = maxBufferSize; this.commitTickScheduler = Objects.requireNonNull(commitTickScheduler, "commitTickScheduler cannot be null"); - this.storage = Objects.requireNonNull(storage, "storage cannot be null"); this.fileCommitter = Objects.requireNonNull(fileCommitter, "fileCommitter cannot be null"); this.writerMetrics = Objects.requireNonNull(writerMetrics, "writerMetrics cannot be null"); this.brokerTopicStats = brokerTopicStats; @@ -227,7 +231,6 @@ public void close() throws IOException { // Rotate file before closing the uploader so the file gets into the queue first. rotateFile(true); fileCommitter.close(); - storage.close(); writerMetrics.close(); } finally { lock.unlock(); diff --git a/storage/inkless/src/main/java/io/aiven/inkless/storage_backend/in_memory/InMemoryStorage.java b/storage/inkless/src/main/java/io/aiven/inkless/storage_backend/in_memory/InMemoryStorage.java index d88e0d2ec5..577fce56e6 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/storage_backend/in_memory/InMemoryStorage.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/storage_backend/in_memory/InMemoryStorage.java @@ -18,7 +18,6 @@ package io.aiven.inkless.storage_backend.in_memory; import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.utils.Time; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -47,8 +46,8 @@ public final class InMemoryStorage extends StorageBackend { private final ConcurrentHashMap storage = new ConcurrentHashMap<>(); // needed for reflection based instantiation - public InMemoryStorage() { - super(new Metrics(Time.SYSTEM)); + public InMemoryStorage(final Metrics metrics) { + super(metrics); } @Override diff --git a/storage/inkless/src/test/java/io/aiven/inkless/common/SharedStateTest.java b/storage/inkless/src/test/java/io/aiven/inkless/common/SharedStateTest.java new file mode 100644 index 0000000000..9ee948a7b4 --- /dev/null +++ b/storage/inkless/src/test/java/io/aiven/inkless/common/SharedStateTest.java @@ -0,0 +1,122 @@ +/* + * Inkless + * Copyright (C) 2024 - 2025 Aiven OY + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package io.aiven.inkless.common; + +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.storage.internals.log.LogConfig; +import org.apache.kafka.storage.log.metrics.BrokerTopicStats; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InOrder; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; + +import java.time.Duration; +import java.util.concurrent.atomic.AtomicInteger; + +import io.aiven.inkless.config.InklessConfig; +import io.aiven.inkless.control_plane.ControlPlane; +import io.aiven.inkless.control_plane.MetadataView; +import io.aiven.inkless.storage_backend.common.StorageBackend; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.STRICT_STUBS) +class SharedStateTest { + + @Mock + InklessConfig config; + + @Mock + MetadataView metadataView; + + @Mock + ControlPlane controlPlane; + + @Mock + BrokerTopicStats brokerTopicStats; + + @Mock + StorageBackend firstBackend; + + @Mock + StorageBackend secondBackend; + + @Mock + StorageBackend thirdBackend; + + @Mock + StorageBackend fourthBackend; + + @BeforeEach + void setupConfig() { + when(config.fileCleanerRetentionPeriod()).thenReturn(Duration.ofMillis(2000)); + when(config.isBatchCoordinateCacheEnabled()).thenReturn(true); + when(config.batchCoordinateCacheTtl()).thenReturn(Duration.ofMillis(100)); + when(config.cacheMaxCount()).thenReturn(10L); + when(config.cacheExpirationLifespanSec()).thenReturn(30); + when(config.cacheExpirationMaxIdleSec()).thenReturn(10); + when(config.fetchLaggingConsumerThreadPoolSize()).thenReturn(1); + } + + @Test + void shouldCloseResourcesInReverseOrderOnFailure() throws Exception { + final AtomicInteger storageCallCount = new AtomicInteger(); + + when(config.storage(any(Metrics.class))).thenAnswer(invocation -> { + int count = storageCallCount.incrementAndGet(); + if (count == 3) { + throw new RuntimeException("Failure creating third storage"); + } + return switch (count) { + case 1 -> firstBackend; + case 2 -> secondBackend; + case 4 -> fourthBackend; + default -> thirdBackend; + }; + }); + + assertThatThrownBy(() -> SharedState.initialize( + Time.SYSTEM, + 1, + config, + metadataView, + controlPlane, + brokerTopicStats, + () -> mock(LogConfig.class) + )).isInstanceOf(RuntimeException.class) + .hasMessageContaining("Failed to initialize SharedState"); + + final InOrder inOrder = inOrder(firstBackend, secondBackend, thirdBackend, fourthBackend); + inOrder.verify(firstBackend).close(); + inOrder.verify(secondBackend).close(); + inOrder.verify(thirdBackend, times(0)).close(); + inOrder.verify(fourthBackend, times(0)).close(); + } +} diff --git a/storage/inkless/src/test/java/io/aiven/inkless/consume/ReaderTest.java b/storage/inkless/src/test/java/io/aiven/inkless/consume/ReaderTest.java index cfbcc6d80e..d9c993647d 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/consume/ReaderTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/consume/ReaderTest.java @@ -70,7 +70,7 @@ import io.aiven.inkless.control_plane.FindBatchRequest; import io.aiven.inkless.control_plane.FindBatchResponse; import io.aiven.inkless.generated.FileExtent; -import io.aiven.inkless.storage_backend.common.ObjectFetcher; +import io.aiven.inkless.storage_backend.common.StorageBackend; import io.aiven.inkless.storage_backend.common.StorageBackendException; import static java.util.concurrent.TimeUnit.SECONDS; @@ -109,7 +109,7 @@ public class ReaderTest { @Mock private ControlPlane controlPlane; @Mock - private ObjectFetcher objectFetcher; + private StorageBackend objectFetcher; @Mock private FetchParams fetchParams; @Mock @@ -133,7 +133,7 @@ public void testClose() throws Exception { reader.close(); verify(metadataExecutor, atLeastOnce()).shutdown(); verify(fetchDataExecutor, atLeastOnce()).shutdown(); - verify(laggingFetchDataExecutor, atLeastOnce()).shutdown(); + // laggingFetchDataExecutor is not used by getReader() which uses Optional.empty() } @Nested @@ -690,10 +690,10 @@ private Reader getReader() { 0, metadataExecutor, fetchDataExecutor, - objectFetcher, + Optional.empty(), Long.MAX_VALUE, 0, - laggingFetchDataExecutor, + Optional.empty(), fetchMetrics, new BrokerTopicStats()); } @@ -716,6 +716,9 @@ class RateLimitingTests { private ExecutorService fetchDataExecutor; private ExecutorService laggingFetchDataExecutor; + @Mock + private StorageBackend laggingObjectFetcher; + @BeforeEach public void setup() { metadataExecutor = Executors.newSingleThreadExecutor(); @@ -766,8 +769,8 @@ public void testRateLimitingWithLoad() throws Exception { .thenReturn(List.of(oldResponse)); final ReadableByteChannel channel = mock(ReadableByteChannel.class); - when(objectFetcher.fetch(any(), any())).thenReturn(channel); - when(objectFetcher.readToByteBuffer(channel)).thenReturn(records.buffer()); + when(laggingObjectFetcher.fetch(any(), any())).thenReturn(channel); + when(laggingObjectFetcher.readToByteBuffer(channel)).thenReturn(records.buffer()); try (final var reader = new Reader( time, @@ -779,10 +782,10 @@ public void testRateLimitingWithLoad() throws Exception { 0, metadataExecutor, fetchDataExecutor, - objectFetcher, + Optional.of(laggingObjectFetcher), LAGGING_THRESHOLD_MS, RATE_LIMIT_REQ_PER_SEC, - laggingFetchDataExecutor, + Optional.of(laggingFetchDataExecutor), fetchMetrics, new BrokerTopicStats())) { @@ -845,8 +848,8 @@ public void testRateLimitingDisabled() throws Exception { .thenReturn(List.of(oldResponse)); final ReadableByteChannel channel = mock(ReadableByteChannel.class); - when(objectFetcher.fetch(any(), any())).thenReturn(channel); - when(objectFetcher.readToByteBuffer(channel)).thenReturn(records.buffer()); + when(laggingObjectFetcher.fetch(any(), any())).thenReturn(channel); + when(laggingObjectFetcher.readToByteBuffer(channel)).thenReturn(records.buffer()); try (final var reader = new Reader( time, @@ -858,10 +861,10 @@ public void testRateLimitingDisabled() throws Exception { 0, metadataExecutor, fetchDataExecutor, - objectFetcher, + Optional.of(laggingObjectFetcher), LAGGING_THRESHOLD_MS, 0, // Rate limiting disabled - laggingFetchDataExecutor, + Optional.of(laggingFetchDataExecutor), fetchMetrics, new BrokerTopicStats())) { @@ -993,10 +996,10 @@ public void testMixedLaggingAndRecentPartitions() throws Exception { 0, testMetadataExecutor, testFetchDataExecutor, - objectFetcher, // Use same fetcher for lagging to simplify test + Optional.of(laggingObjectFetcher), LAGGING_THRESHOLD_MS, RATE_LIMIT_REQ_PER_SEC, - saturatedLaggingExecutor, // Saturated executor for lagging path + Optional.of(saturatedLaggingExecutor), // Saturated executor for lagging path fetchMetrics, new BrokerTopicStats())) { @@ -1088,10 +1091,10 @@ public void testRecentDataBypassesRateLimiting() throws Exception { 0, metadataExecutor, fetchDataExecutor, - objectFetcher, + Optional.of(objectFetcher), LAGGING_THRESHOLD_MS, RATE_LIMIT_REQ_PER_SEC, - laggingFetchDataExecutor, + Optional.of(laggingFetchDataExecutor), fetchMetrics, new BrokerTopicStats())) { diff --git a/storage/inkless/src/test/java/io/aiven/inkless/delete/DeleteRecordsInterceptorTest.java b/storage/inkless/src/test/java/io/aiven/inkless/delete/DeleteRecordsInterceptorTest.java index 43e197dbd2..7ee6ec50aa 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/delete/DeleteRecordsInterceptorTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/delete/DeleteRecordsInterceptorTest.java @@ -22,10 +22,6 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.common.message.DeleteRecordsResponseData; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.utils.MockTime; -import org.apache.kafka.common.utils.Time; -import org.apache.kafka.storage.internals.log.LogConfig; -import org.apache.kafka.storage.log.metrics.BrokerTopicStats; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -36,22 +32,10 @@ import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; -import java.time.Duration; import java.util.List; import java.util.Map; import java.util.function.Consumer; -import java.util.function.Supplier; - -import io.aiven.inkless.cache.BatchCoordinateCache; -import io.aiven.inkless.cache.CaffeineBatchCoordinateCache; -import io.aiven.inkless.cache.FixedBlockAlignment; -import io.aiven.inkless.cache.KeyAlignmentStrategy; -import io.aiven.inkless.cache.NullCache; -import io.aiven.inkless.cache.ObjectCache; -import io.aiven.inkless.common.ObjectKey; -import io.aiven.inkless.common.ObjectKeyCreator; -import io.aiven.inkless.common.SharedState; -import io.aiven.inkless.config.InklessConfig; + import io.aiven.inkless.control_plane.ControlPlane; import io.aiven.inkless.control_plane.DeleteRecordsRequest; import io.aiven.inkless.control_plane.DeleteRecordsResponse; @@ -71,25 +55,12 @@ @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.STRICT_STUBS) class DeleteRecordsInterceptorTest { - static final int BROKER_ID = 11; - static final ObjectKeyCreator OBJECT_KEY_CREATOR = ObjectKey.creator("", false); - private static final KeyAlignmentStrategy KEY_ALIGNMENT_STRATEGY = new FixedBlockAlignment(Integer.MAX_VALUE); - private static final ObjectCache OBJECT_CACHE = new NullCache(); - private static final BatchCoordinateCache BATCH_COORDINATE_CACHE = new CaffeineBatchCoordinateCache(Duration.ofSeconds(30)); - - static final Supplier DEFAULT_TOPIC_CONFIGS = () -> new LogConfig(Map.of()); - - Time time = new MockTime(); - @Mock - InklessConfig disklessConfig; @Mock MetadataView metadataView; @Mock ControlPlane controlPlane; @Mock Consumer> responseCallback; - @Mock - BrokerTopicStats brokerTopicStats; @Captor ArgumentCaptor> resultCaptor; @@ -100,20 +71,8 @@ class DeleteRecordsInterceptorTest { public void mixingDisklessAndClassicTopicsIsNotAllowed() { when(metadataView.isDisklessTopic(eq("diskless"))).thenReturn(true); when(metadataView.isDisklessTopic(eq("non_diskless"))).thenReturn(false); - final SharedState state = new SharedState( - time, - BROKER_ID, - disklessConfig, - metadataView, - controlPlane, - OBJECT_KEY_CREATOR, - KEY_ALIGNMENT_STRATEGY, - OBJECT_CACHE, - BATCH_COORDINATE_CACHE, - brokerTopicStats, - DEFAULT_TOPIC_CONFIGS - ); - final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor(state); + + final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor(controlPlane, metadataView, new SynchronousExecutor()); final Map entriesPerPartition = Map.of( new TopicPartition("diskless", 0), @@ -144,9 +103,8 @@ public void mixingDisklessAndClassicTopicsIsNotAllowed() { @Test public void notInterceptDeletingRecordsFromClassicTopics() { when(metadataView.isDisklessTopic(eq("non_diskless"))).thenReturn(false); - final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor( - new SharedState(time, BROKER_ID, disklessConfig, metadataView, controlPlane, - OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS)); + + final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor(controlPlane, metadataView, new SynchronousExecutor()); final Map entriesPerPartition = Map.of( new TopicPartition("non_diskless", 0), 4567L @@ -181,10 +139,8 @@ public void interceptDeletingRecordsFromDisklessTopics() { } }); - final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor( - new SharedState(time, BROKER_ID, disklessConfig, metadataView, controlPlane, - OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS), - new SynchronousExecutor()); + + final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor(controlPlane, metadataView, new SynchronousExecutor()); final TopicPartition tp0 = new TopicPartition("diskless", 0); final TopicPartition tp1 = new TopicPartition("diskless", 1); @@ -224,10 +180,7 @@ public void controlPlaneException() { when(controlPlane.deleteRecords(anyList())).thenThrow(new RuntimeException("test")); - final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor( - new SharedState(time, BROKER_ID, disklessConfig, metadataView, controlPlane, - OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS), - new SynchronousExecutor()); + final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor(controlPlane, metadataView, new SynchronousExecutor()); final TopicPartition topicPartition = new TopicPartition("diskless", 1); final Map entriesPerPartition = Map.of( @@ -265,10 +218,7 @@ public void topicIdNotFound() { } }); - final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor( - new SharedState(time, BROKER_ID, disklessConfig, metadataView, controlPlane, - OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS), - new SynchronousExecutor()); + final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor(controlPlane, metadataView, new SynchronousExecutor()); final TopicPartition topicPartition1 = new TopicPartition("diskless1", 1); final TopicPartition topicPartition2 = new TopicPartition("diskless2", 2); diff --git a/storage/inkless/src/test/java/io/aiven/inkless/delete/FileCleanerIntegrationTest.java b/storage/inkless/src/test/java/io/aiven/inkless/delete/FileCleanerIntegrationTest.java index 94938f471a..221bd56242 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/delete/FileCleanerIntegrationTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/delete/FileCleanerIntegrationTest.java @@ -41,18 +41,14 @@ import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import org.junit.jupiter.api.io.TempDir; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import java.io.IOException; -import java.nio.file.Path; import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; @@ -100,8 +96,6 @@ @Testcontainers @Tag("integration") class FileCleanerIntegrationTest { - private static final Logger LOGGER = LoggerFactory.getLogger(FileCleanerIntegrationTest.class); - @Container static final MinioContainer MINIO = S3TestContainer.minio(); @@ -134,8 +128,6 @@ class FileCleanerIntegrationTest { MetadataView metadataView; @Mock Supplier defaultTopicConfigs; - @TempDir - Path logDir; ControlPlane controlPlane; SharedState sharedState; diff --git a/storage/inkless/src/test/java/io/aiven/inkless/delete/FileCleanerMockedTest.java b/storage/inkless/src/test/java/io/aiven/inkless/delete/FileCleanerMockedTest.java index a9bbd67f52..c083d7de81 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/delete/FileCleanerMockedTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/delete/FileCleanerMockedTest.java @@ -41,6 +41,7 @@ import io.aiven.inkless.control_plane.FileToDelete; import io.aiven.inkless.storage_backend.common.StorageBackend; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -58,6 +59,14 @@ class FileCleanerMockedTest { static final ObjectKeyCreator OBJECT_KEY_CREATOR = ObjectKey.creator("", false); + @Test + void close() throws Exception { + final var cleaner = new FileCleaner(time, controlPlane, storageBackend, OBJECT_KEY_CREATOR, RETENTION_PERIOD); + cleaner.close(); + // storage is shared across background jobs and is managed by SharedState. + verify(storageBackend, never()).close(); + } + @Test void empty() throws Exception { final var cleaner = new FileCleaner(time, controlPlane, storageBackend, OBJECT_KEY_CREATOR, RETENTION_PERIOD); diff --git a/storage/inkless/src/test/java/io/aiven/inkless/merge/FileMergerIntegrationTest.java b/storage/inkless/src/test/java/io/aiven/inkless/merge/FileMergerIntegrationTest.java index bafd7b95ec..e0d526dcec 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/merge/FileMergerIntegrationTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/merge/FileMergerIntegrationTest.java @@ -39,7 +39,6 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.extension.ExtendWith; -import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import org.mockito.Mock; @@ -154,8 +153,6 @@ static void tearDownS3() { ControlPlane controlPlane; SharedState sharedState; - @TempDir - Path logDir; @BeforeEach void setup() { @@ -271,10 +268,9 @@ private void writeRecords(final AppendHandler appendHandler) { } CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).orTimeout(60, TimeUnit.SECONDS); - futures.forEach(response -> { - response.join() - .forEach((tp, partitionResponse) -> assertThat(partitionResponse.error).isEqualTo(Errors.NONE)); - }); + futures.forEach(response -> + response.join().forEach((tp, partitionResponse) -> + assertThat(partitionResponse.error).isEqualTo(Errors.NONE))); } private Map getHighWatermarks(final ControlPlane controlPlane) { diff --git a/storage/inkless/src/test/java/io/aiven/inkless/merge/FileMergerMockedTest.java b/storage/inkless/src/test/java/io/aiven/inkless/merge/FileMergerMockedTest.java index 436a522b48..fa13cb2d94 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/merge/FileMergerMockedTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/merge/FileMergerMockedTest.java @@ -21,10 +21,8 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.storage.log.metrics.BrokerTopicStats; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; @@ -46,12 +44,11 @@ import java.time.Duration; import java.time.Instant; import java.util.List; -import java.util.function.Supplier; import io.aiven.inkless.common.ObjectFormat; import io.aiven.inkless.common.ObjectKey; +import io.aiven.inkless.common.ObjectKeyCreator; import io.aiven.inkless.common.PlainObjectKey; -import io.aiven.inkless.common.SharedState; import io.aiven.inkless.config.InklessConfig; import io.aiven.inkless.control_plane.BatchInfo; import io.aiven.inkless.control_plane.BatchMetadata; @@ -59,7 +56,6 @@ import io.aiven.inkless.control_plane.ControlPlaneException; import io.aiven.inkless.control_plane.FileMergeWorkItem; import io.aiven.inkless.control_plane.MergedFileBatch; -import io.aiven.inkless.control_plane.MetadataView; import io.aiven.inkless.storage_backend.common.StorageBackend; import io.aiven.inkless.storage_backend.common.StorageBackendException; @@ -96,6 +92,7 @@ class FileMergerMockedTest { static final TopicIdPartition T1P0 = new TopicIdPartition(TOPIC_ID_1, 0, TOPIC_1); static final TopicIdPartition T1P1 = new TopicIdPartition(TOPIC_ID_1, 1, TOPIC_1); public static final Path WORK_DIR = Path.of("/tmp/inkless/file-merge"); + public static final ObjectKeyCreator OBJECT_KEY_CREATOR = ObjectKey.creator("prefix", false); @Mock Time time; @@ -110,28 +107,23 @@ class FileMergerMockedTest { @Captor ArgumentCaptor sleepCaptor; - SharedState sharedState; - - @BeforeEach - void setup() { - when(inklessConfig.objectKeyPrefix()).thenReturn("prefix"); - when(inklessConfig.fileMergeWorkDir()).thenReturn(WORK_DIR); - when(inklessConfig.cacheMaxCount()).thenReturn(10000L); - - sharedState = SharedState.initialize(time, BROKER_ID, inklessConfig, mock(MetadataView.class), controlPlane, - mock(BrokerTopicStats.class), mock(Supplier.class)); - } - @AfterEach void tearDown() { assertThat(WORK_DIR).isEmptyDirectory(); } + @Test + void close() throws Exception { + final FileMerger fileMerger = new FileMerger(time, inklessConfig, controlPlane, storage, OBJECT_KEY_CREATOR, BROKER_ID, WORK_DIR); + fileMerger.close(); + // storage is shared across background jobs and is managed by SharedState. + verify(storage, never()).close(); + } + @Test void singleFileSingleBatch() throws StorageBackendException, IOException { when(inklessConfig.produceMaxUploadAttempts()).thenReturn(1); when(inklessConfig.produceUploadBackoff()).thenReturn(Duration.ZERO); - when(inklessConfig.storage(any())).thenReturn(storage); final String obj1 = "obj1"; @@ -167,7 +159,7 @@ void singleFileSingleBatch() throws StorageBackendException, IOException { new FileMergeWorkItem(WORK_ITEM_ID, Instant.ofEpochMilli(1234), List.of(file1InWorkItem)) ); - final FileMerger fileMerger = new FileMerger(sharedState); + final FileMerger fileMerger = new FileMerger(time, inklessConfig, controlPlane, storage, OBJECT_KEY_CREATOR, BROKER_ID, WORK_DIR); fileMerger.run(); verify(storage).fetch(PlainObjectKey.create("", obj1), null); @@ -187,7 +179,6 @@ void singleFileSingleBatch() throws StorageBackendException, IOException { void twoFilesWithGaps(final boolean directFileOrder, final boolean directBatchOrder) throws StorageBackendException, IOException { when(inklessConfig.produceMaxUploadAttempts()).thenReturn(1); when(inklessConfig.produceUploadBackoff()).thenReturn(Duration.ZERO); - when(inklessConfig.storage(any())).thenReturn(storage); final String obj1 = "obj1"; final String obj2 = "obj2"; @@ -291,7 +282,7 @@ void twoFilesWithGaps(final boolean directFileOrder, final boolean directBatchOr new FileMergeWorkItem(WORK_ITEM_ID, Instant.ofEpochMilli(1234), files) ); - final FileMerger fileMerger = new FileMerger(sharedState); + final FileMerger fileMerger = new FileMerger(time, inklessConfig, controlPlane, storage, OBJECT_KEY_CREATOR, BROKER_ID, WORK_DIR); fileMerger.run(); verify(storage).fetch(PlainObjectKey.create("", obj1), null); @@ -307,7 +298,7 @@ void twoFilesWithGaps(final boolean directFileOrder, final boolean directBatchOr void mustSleepWhenNoWorkItem() { when(controlPlane.getFileMergeWorkItem()).thenReturn(null); - final FileMerger fileMerger = new FileMerger(sharedState); + final FileMerger fileMerger = new FileMerger(time, inklessConfig, controlPlane, storage, OBJECT_KEY_CREATOR, BROKER_ID, WORK_DIR); fileMerger.run(); verify(time).sleep(sleepCaptor.capture()); assertThat(sleepCaptor.getValue()).isBetween((long) (10_000L * 0.8), (long) (20_000L * 1.2)); @@ -317,8 +308,6 @@ void mustSleepWhenNoWorkItem() { @Test void errorInReading() throws Exception { - when(inklessConfig.storage(any())).thenReturn(storage); - final String obj1 = "obj1"; final long batch1Id = 1; when(storage.fetch(any(ObjectKey.class), isNull())) @@ -331,7 +320,7 @@ void errorInReading() throws Exception { new FileMergeWorkItem(WORK_ITEM_ID, Instant.ofEpochMilli(1234), List.of(file1InWorkItem)) ); - final FileMerger fileMerger = new FileMerger(sharedState); + final FileMerger fileMerger = new FileMerger(time, inklessConfig, controlPlane, storage, OBJECT_KEY_CREATOR, BROKER_ID, WORK_DIR); fileMerger.run(); verify(controlPlane).releaseFileMergeWorkItem(eq(WORK_ITEM_ID)); @@ -345,7 +334,6 @@ void errorInReading() throws Exception { @Test void errorInWriting() throws Exception { - when(inklessConfig.storage(any())).thenReturn(storage); when(inklessConfig.produceMaxUploadAttempts()).thenReturn(1); when(inklessConfig.produceUploadBackoff()).thenReturn(Duration.ZERO); @@ -373,7 +361,7 @@ void errorInWriting() throws Exception { new FileMergeWorkItem(WORK_ITEM_ID, Instant.ofEpochMilli(1234), List.of(file1InWorkItem)) ); - final FileMerger fileMerger = new FileMerger(sharedState); + final FileMerger fileMerger = new FileMerger(time, inklessConfig, controlPlane, storage, OBJECT_KEY_CREATOR, BROKER_ID, WORK_DIR); fileMerger.run(); verify(controlPlane).releaseFileMergeWorkItem(eq(WORK_ITEM_ID)); @@ -387,7 +375,6 @@ void errorInWriting() throws Exception { @ParameterizedTest @ValueSource(booleans = {true, false}) void errorInCommittingFromControlPlane(boolean isSafeToDelete) throws Exception { - when(inklessConfig.storage(any())).thenReturn(storage); when(inklessConfig.produceMaxUploadAttempts()).thenReturn(1); when(inklessConfig.produceUploadBackoff()).thenReturn(Duration.ZERO); @@ -418,7 +405,7 @@ void errorInCommittingFromControlPlane(boolean isSafeToDelete) throws Exception .when(controlPlane).commitFileMergeWorkItem(anyLong(), anyString(), any(), anyInt(), anyLong(), any()); when(controlPlane.isSafeToDeleteFile(anyString())).thenReturn(isSafeToDelete); - final FileMerger fileMerger = new FileMerger(sharedState); + final FileMerger fileMerger = new FileMerger(time, inklessConfig, controlPlane, storage, OBJECT_KEY_CREATOR, BROKER_ID, WORK_DIR); fileMerger.run(); verify(controlPlane).releaseFileMergeWorkItem(eq(WORK_ITEM_ID)); @@ -431,7 +418,6 @@ void errorInCommittingFromControlPlane(boolean isSafeToDelete) throws Exception @Test void errorInCommittingNotFromControlPlane() throws Exception { - when(inklessConfig.storage(any())).thenReturn(storage); when(inklessConfig.produceMaxUploadAttempts()).thenReturn(1); when(inklessConfig.produceUploadBackoff()).thenReturn(Duration.ZERO); @@ -460,7 +446,7 @@ void errorInCommittingNotFromControlPlane() throws Exception { doThrow(new RuntimeException("test")) .when(controlPlane).commitFileMergeWorkItem(anyLong(), anyString(), any(), anyInt(), anyLong(), any()); - final FileMerger fileMerger = new FileMerger(sharedState); + final FileMerger fileMerger = new FileMerger(time, inklessConfig, controlPlane, storage, OBJECT_KEY_CREATOR, BROKER_ID, WORK_DIR); fileMerger.run(); verify(controlPlane).releaseFileMergeWorkItem(eq(WORK_ITEM_ID)); @@ -481,5 +467,4 @@ private byte[] concat(final byte[] ... arrays) { throw new RuntimeException(e); } } - } diff --git a/storage/inkless/src/test/java/io/aiven/inkless/produce/AppendHandlerTest.java b/storage/inkless/src/test/java/io/aiven/inkless/produce/AppendHandlerTest.java index 6fda9ad1e8..e1dc6bfbe2 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/produce/AppendHandlerTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/produce/AppendHandlerTest.java @@ -40,20 +40,11 @@ import org.mockito.quality.Strictness; import java.io.IOException; -import java.time.Duration; import java.util.Map; import java.util.Properties; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; -import io.aiven.inkless.cache.BatchCoordinateCache; -import io.aiven.inkless.cache.CaffeineBatchCoordinateCache; -import io.aiven.inkless.cache.FixedBlockAlignment; -import io.aiven.inkless.cache.KeyAlignmentStrategy; -import io.aiven.inkless.cache.NullCache; -import io.aiven.inkless.cache.ObjectCache; -import io.aiven.inkless.common.ObjectKey; -import io.aiven.inkless.common.ObjectKeyCreator; import io.aiven.inkless.common.SharedState; import io.aiven.inkless.config.InklessConfig; import io.aiven.inkless.control_plane.ControlPlane; @@ -71,10 +62,6 @@ @MockitoSettings(strictness = Strictness.STRICT_STUBS) public class AppendHandlerTest { static final int BROKER_ID = 11; - static final ObjectKeyCreator OBJECT_KEY_CREATOR = ObjectKey.creator("", false); - private static final KeyAlignmentStrategy KEY_ALIGNMENT_STRATEGY = new FixedBlockAlignment(Integer.MAX_VALUE); - private static final ObjectCache OBJECT_CACHE = new NullCache(); - private static final BatchCoordinateCache BATCH_COORDINATE_CACHE = new CaffeineBatchCoordinateCache(Duration.ofSeconds(30)); static final Supplier DEFAULT_TOPIC_CONFIGS = () -> new LogConfig(Map.of()); @@ -113,9 +100,11 @@ public class AppendHandlerTest { @Test public void rejectTransactionalProduce() throws Exception { - try (final AppendHandler interceptor = new AppendHandler( - new SharedState(time, BROKER_ID, inklessConfig, metadataView, controlPlane, - OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS), writer)) { + try ( + final SharedState sharedState = SharedState.initialize(time, BROKER_ID, inklessConfig, metadataView, controlPlane, + brokerTopicStats, DEFAULT_TOPIC_CONFIGS); + final AppendHandler interceptor = new AppendHandler(sharedState, writer) + ) { final TopicIdPartition topicIdPartition1 = new TopicIdPartition(Uuid.randomUuid(), 0, "inkless1"); final TopicIdPartition topicIdPartition2 = new TopicIdPartition(Uuid.randomUuid(), 0, "inkless2"); @@ -137,9 +126,11 @@ topicIdPartition2, new PartitionResponse(Errors.INVALID_REQUEST) @Test public void emptyRequests() throws Exception { - try (final AppendHandler interceptor = new AppendHandler( - new SharedState(time, BROKER_ID, inklessConfig, metadataView, controlPlane, - OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS), writer)) { + try ( + final SharedState sharedState = SharedState.initialize(time, BROKER_ID, inklessConfig, metadataView, controlPlane, + brokerTopicStats, DEFAULT_TOPIC_CONFIGS); + final AppendHandler interceptor = new AppendHandler(sharedState, writer) + ) { final Map entriesPerPartition = Map.of(); @@ -165,9 +156,11 @@ topicIdPartition, new PartitionResponse(Errors.NONE) ); when(metadataView.getTopicConfig(any())).thenReturn(new Properties()); - try (final AppendHandler interceptor = new AppendHandler( - new SharedState(time, BROKER_ID, inklessConfig, metadataView, controlPlane, - OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS), writer)) { + try ( + final SharedState sharedState = SharedState.initialize(time, BROKER_ID, inklessConfig, metadataView, controlPlane, + brokerTopicStats, DEFAULT_TOPIC_CONFIGS); + final AppendHandler interceptor = new AppendHandler(sharedState, writer) + ) { final var result = interceptor.handle(entriesPerPartition, requestLocal).get(); assertThat(result).isEqualTo(writeResult); @@ -188,21 +181,24 @@ public void writeFutureFailed() throws Exception { ); when(metadataView.getTopicConfig(any())).thenReturn(new Properties()); - try (final AppendHandler interceptor = new AppendHandler( - new SharedState(time, BROKER_ID, inklessConfig, metadataView, controlPlane, - OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS), writer)) { + try ( + final SharedState sharedState = SharedState.initialize(time, BROKER_ID, inklessConfig, metadataView, controlPlane, + brokerTopicStats, DEFAULT_TOPIC_CONFIGS); + final AppendHandler interceptor = new AppendHandler(sharedState, writer) + ) { assertThatThrownBy(() -> interceptor.handle(entriesPerPartition, requestLocal).get()).hasCause(exception); } } @Test public void close() throws IOException { - final AppendHandler interceptor = new AppendHandler( - new SharedState(time, BROKER_ID, inklessConfig, metadataView, controlPlane, - OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS), writer); + try (final SharedState sharedState = SharedState.initialize(time, BROKER_ID, inklessConfig, metadataView, controlPlane, + brokerTopicStats, DEFAULT_TOPIC_CONFIGS)) { + final AppendHandler interceptor = new AppendHandler(sharedState, writer); - interceptor.close(); + interceptor.close(); - verify(writer).close(); + verify(writer).close(); + } } } diff --git a/storage/inkless/src/test/java/io/aiven/inkless/produce/WriterMockedTest.java b/storage/inkless/src/test/java/io/aiven/inkless/produce/WriterMockedTest.java index 1a22decfea..0a765057d1 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/produce/WriterMockedTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/produce/WriterMockedTest.java @@ -110,7 +110,7 @@ void setup() { @Test void tickWithEmptyFile() throws InterruptedException { final Writer writer = new Writer( - time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, storage, fileCommitter, writerMetrics, brokerTopicStats); + time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, fileCommitter, writerMetrics, brokerTopicStats); writer.tick(); @@ -121,7 +121,7 @@ void tickWithEmptyFile() throws InterruptedException { @Test void tickIsScheduledWhenFileIsWrittenTo() { final Writer writer = new Writer( - time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, storage, fileCommitter, writerMetrics, brokerTopicStats); + time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, fileCommitter, writerMetrics, brokerTopicStats); final Map writeRequest = Map.of( T0P0, recordCreator.create(T0P0.topicPartition(), 100) @@ -136,7 +136,7 @@ void committingDueToOverfillWithFirstRequest() throws InterruptedException { when(time.nanoseconds()).thenReturn(10_000_000L); final Writer writer = new Writer( - time, Duration.ofMillis(1), 15908, commitTickScheduler, storage, fileCommitter, writerMetrics, brokerTopicStats); + time, Duration.ofMillis(1), 15908, commitTickScheduler, fileCommitter, writerMetrics, brokerTopicStats); final Map writeRequest = Map.of( T0P0, recordCreator.create(T0P0.topicPartition(), 100), @@ -156,7 +156,7 @@ void committingDueToOverfillWithFirstRequest() throws InterruptedException { @Test void committingDueToOverfillBeforeLastRequest() throws InterruptedException { final Writer writer = new Writer( - time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, storage, fileCommitter, writerMetrics, brokerTopicStats); + time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, fileCommitter, writerMetrics, brokerTopicStats); final Map writeRequest0 = Map.of( T0P0, recordCreator.create(T0P0.topicPartition(), 1), @@ -189,7 +189,7 @@ void committingDueToOverfillBeforeLastRequest() throws InterruptedException { @Test void committingDueToOverfillBeforeAfterLastRequest() throws InterruptedException { final Writer writer = new Writer( - time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, storage, fileCommitter, writerMetrics, brokerTopicStats); + time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, fileCommitter, writerMetrics, brokerTopicStats); final Map writeRequest0 = Map.of( T0P0, recordCreator.create(T0P0.topicPartition(), 1), @@ -220,7 +220,7 @@ void committingDueToOverfillBeforeAfterLastRequest() throws InterruptedException @Test void committingOnTick() throws InterruptedException { final Writer writer = new Writer( - time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, storage, fileCommitter, writerMetrics, brokerTopicStats); + time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, fileCommitter, writerMetrics, brokerTopicStats); final Map writeRequest = Map.of( T0P0, recordCreator.create(T0P0.topicPartition(), 1), @@ -241,7 +241,7 @@ void committingOnTick() throws InterruptedException { @Test void committingDueToClose() throws InterruptedException, IOException { final Writer writer = new Writer( - time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, storage, fileCommitter, writerMetrics, brokerTopicStats); + time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, fileCommitter, writerMetrics, brokerTopicStats); final Map writeRequest = Map.of( T0P0, recordCreator.create(T0P0.topicPartition(), 1), @@ -262,7 +262,7 @@ void committingDueToClose() throws InterruptedException, IOException { @Test void writeAfterRotation() throws InterruptedException { final Writer writer = new Writer( - time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, storage, fileCommitter, writerMetrics, brokerTopicStats); + time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, fileCommitter, writerMetrics, brokerTopicStats); final Map writeRequest0 = Map.of( T0P0, recordCreator.create(T0P0.topicPartition(), 100), @@ -290,7 +290,7 @@ void writeAfterRotation() throws InterruptedException { @Test void close() throws IOException { final Writer writer = new Writer( - time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, storage, fileCommitter, writerMetrics, brokerTopicStats); + time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, fileCommitter, writerMetrics, brokerTopicStats); reset(commitTickScheduler); writer.close(); @@ -302,7 +302,7 @@ void close() throws IOException { @Test void closeAfterClose() throws IOException { final Writer writer = new Writer( - time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, storage, fileCommitter, writerMetrics, brokerTopicStats); + time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, fileCommitter, writerMetrics, brokerTopicStats); writer.close(); reset(commitTickScheduler); @@ -319,7 +319,7 @@ void closeAfterClose() throws IOException { @Test void tickAfterClose() throws IOException { final Writer writer = new Writer( - time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, storage, fileCommitter, writerMetrics, brokerTopicStats); + time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, fileCommitter, writerMetrics, brokerTopicStats); writer.close(); reset(commitTickScheduler); @@ -336,7 +336,7 @@ void tickAfterClose() throws IOException { @Test void writeAfterClose() throws IOException { final Writer writer = new Writer( - time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, storage, fileCommitter, writerMetrics, brokerTopicStats); + time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, fileCommitter, writerMetrics, brokerTopicStats); writer.close(); reset(commitTickScheduler); reset(fileCommitter); @@ -358,7 +358,7 @@ void writeAfterClose() throws IOException { @Test void commitInterrupted() throws InterruptedException, IOException { final Writer writer = new Writer( - time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, storage, fileCommitter, writerMetrics, brokerTopicStats); + time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, fileCommitter, writerMetrics, brokerTopicStats); final InterruptedException interruptedException = new InterruptedException(); doThrow(interruptedException).when(fileCommitter).commit(any()); @@ -381,35 +381,32 @@ void commitInterrupted() throws InterruptedException, IOException { @Test void constructorInvalidArguments() { assertThatThrownBy(() -> new Writer( - null, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, storage, fileCommitter, writerMetrics, brokerTopicStats)) + null, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, fileCommitter, writerMetrics, brokerTopicStats)) .isInstanceOf(NullPointerException.class) .hasMessage("time cannot be null"); assertThatThrownBy(() -> new Writer( - time, null, 8 * 1024, commitTickScheduler, storage, fileCommitter, writerMetrics, brokerTopicStats)) + time, null, 8 * 1024, commitTickScheduler, fileCommitter, writerMetrics, brokerTopicStats)) .isInstanceOf(NullPointerException.class) .hasMessage("commitInterval cannot be null"); assertThatThrownBy(() -> new Writer( - time, Duration.ofMillis(1), 0, commitTickScheduler, storage, fileCommitter, writerMetrics, brokerTopicStats)) + time, Duration.ofMillis(1), 0, commitTickScheduler, fileCommitter, writerMetrics, brokerTopicStats)) .isInstanceOf(IllegalArgumentException.class) .hasMessage("maxBufferSize must be positive"); assertThatThrownBy(() -> - new Writer(time, Duration.ofMillis(1), 8 * 1024, null, storage, fileCommitter, writerMetrics, brokerTopicStats)) + new Writer(time, Duration.ofMillis(1), 8 * 1024, null, fileCommitter, writerMetrics, brokerTopicStats)) .isInstanceOf(NullPointerException.class) .hasMessage("commitTickScheduler cannot be null"); - assertThatThrownBy(() -> new Writer(time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, storage, fileCommitter, null, brokerTopicStats)) + assertThatThrownBy(() -> new Writer(time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, fileCommitter, null, brokerTopicStats)) .isInstanceOf(NullPointerException.class) .hasMessage("writerMetrics cannot be null"); - assertThatThrownBy(() -> new Writer(time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, storage, null, writerMetrics, brokerTopicStats)) + assertThatThrownBy(() -> new Writer(time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, null, writerMetrics, brokerTopicStats)) .isInstanceOf(NullPointerException.class) .hasMessage("fileCommitter cannot be null"); - assertThatThrownBy(() -> new Writer(time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, null, fileCommitter, writerMetrics, brokerTopicStats)) - .isInstanceOf(NullPointerException.class) - .hasMessage("storage cannot be null"); } @Test void writeNull() { - final Writer writer = new Writer(time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, storage, fileCommitter, writerMetrics, brokerTopicStats); + final Writer writer = new Writer(time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, fileCommitter, writerMetrics, brokerTopicStats); assertThatThrownBy(() -> writer.write(null, TOPIC_CONFIGS, REQUEST_LOCAL)) .isInstanceOf(NullPointerException.class) @@ -424,7 +421,7 @@ void writeNull() { @Test void writeEmptyRequests() { - final Writer writer = new Writer(time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, storage, fileCommitter, writerMetrics, brokerTopicStats); + final Writer writer = new Writer(time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, fileCommitter, writerMetrics, brokerTopicStats); assertThatThrownBy(() -> writer.write(Map.of(), TOPIC_CONFIGS, REQUEST_LOCAL)) .isInstanceOf(IllegalArgumentException.class) @@ -433,7 +430,7 @@ void writeEmptyRequests() { @Test void entriesTopicConfigMismatch() { - final Writer writer = new Writer(time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, storage, fileCommitter, writerMetrics, brokerTopicStats); + final Writer writer = new Writer(time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, fileCommitter, writerMetrics, brokerTopicStats); assertThatThrownBy(() -> writer.write(Map.of(T0P0, MemoryRecords.withRecords(Compression.NONE, new SimpleRecord(new byte[10]))), Map.of(TOPIC_1, new LogConfig(Map.of())), REQUEST_LOCAL)) .isInstanceOf(IllegalArgumentException.class) diff --git a/storage/inkless/src/test/java/io/aiven/inkless/produce/WriterPropertyTest.java b/storage/inkless/src/test/java/io/aiven/inkless/produce/WriterPropertyTest.java index e273fbbad8..aa65232fe5 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/produce/WriterPropertyTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/produce/WriterPropertyTest.java @@ -248,7 +248,6 @@ void test(final int requestCount, Duration.ofMillis(commitIntervalMsAvg), // it doesn't matter as the scheduling doesn't happen maxBufferSize, mock(ScheduledExecutorService.class), - storage, fileCommitter, mock(WriterMetrics.class), new BrokerTopicStats() diff --git a/storage/inkless/src/test/java/io/aiven/inkless/storage_backend/in_memory/InMemoryStorageTest.java b/storage/inkless/src/test/java/io/aiven/inkless/storage_backend/in_memory/InMemoryStorageTest.java index dd9a0a496b..dceab46204 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/storage_backend/in_memory/InMemoryStorageTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/storage_backend/in_memory/InMemoryStorageTest.java @@ -17,6 +17,8 @@ */ package io.aiven.inkless.storage_backend.in_memory; +import org.apache.kafka.common.metrics.Metrics; + import org.junit.jupiter.api.Test; import java.io.ByteArrayInputStream; @@ -36,10 +38,9 @@ class InMemoryStorageTest { static final PlainObjectKey OBJECT_KEY = PlainObjectKey.create("a", "b"); - @Test void fetchNulls() { - final InMemoryStorage storage = new InMemoryStorage(); + final InMemoryStorage storage = new InMemoryStorage(new Metrics()); assertThatThrownBy(() -> storage.fetch(null, new ByteRange(0, 10))) .isInstanceOf(NullPointerException.class) .hasMessage("key cannot be null"); @@ -50,7 +51,7 @@ void fetchNulls() { @Test void deleteNulls() { - final InMemoryStorage storage = new InMemoryStorage(); + final InMemoryStorage storage = new InMemoryStorage(new Metrics()); assertThatThrownBy(() -> storage.delete((ObjectKey) null)) .isInstanceOf(NullPointerException.class) .hasMessage("key cannot be null"); @@ -61,14 +62,14 @@ void deleteNulls() { @Test void fetchNonExistent() { - final InMemoryStorage storage = new InMemoryStorage(); + final InMemoryStorage storage = new InMemoryStorage(new Metrics()); assertThatThrownBy(() -> storage.fetch(OBJECT_KEY, ByteRange.maxRange())) .isInstanceOf(KeyNotFoundException.class); } @Test void uploadAndFetch() throws StorageBackendException, IOException { - final InMemoryStorage storage = new InMemoryStorage(); + final InMemoryStorage storage = new InMemoryStorage(new Metrics()); final byte[] data = new byte[10]; storage.upload(OBJECT_KEY, new ByteArrayInputStream(data), data.length); @@ -79,7 +80,7 @@ void uploadAndFetch() throws StorageBackendException, IOException { @Test void fetchRanged() throws StorageBackendException, IOException { - final InMemoryStorage storage = new InMemoryStorage(); + final InMemoryStorage storage = new InMemoryStorage(new Metrics()); final byte[] data = new byte[]{0, 1, 2, 3, 4, 5, 6, 7}; storage.upload(OBJECT_KEY, new ByteArrayInputStream(data), data.length); @@ -92,7 +93,7 @@ void fetchRanged() throws StorageBackendException, IOException { @Test void fetchOutsideOfSize() throws StorageBackendException { - final InMemoryStorage storage = new InMemoryStorage(); + final InMemoryStorage storage = new InMemoryStorage(new Metrics()); final byte[] data = new byte[]{0, 1, 2, 3, 4, 5, 6, 7}; storage.upload(OBJECT_KEY, new ByteArrayInputStream(data), data.length); @@ -103,7 +104,7 @@ void fetchOutsideOfSize() throws StorageBackendException { @Test void delete() throws StorageBackendException, IOException { - final InMemoryStorage storage = new InMemoryStorage(); + final InMemoryStorage storage = new InMemoryStorage(new Metrics()); final byte[] data = new byte[]{0, 1, 2, 3, 4, 5, 6, 7}; storage.upload(OBJECT_KEY, new ByteArrayInputStream(data), data.length); @@ -118,7 +119,7 @@ void delete() throws StorageBackendException, IOException { @Test void deleteMany() throws StorageBackendException, IOException { - final InMemoryStorage storage = new InMemoryStorage(); + final InMemoryStorage storage = new InMemoryStorage(new Metrics()); final byte[] data = new byte[]{0, 1, 2, 3, 4, 5, 6, 7}; storage.upload(OBJECT_KEY, new ByteArrayInputStream(data), data.length);