Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 17 additions & 16 deletions core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ import org.mockito.ArgumentMatchers._
import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import org.mockito.{Answers, ArgumentCaptor, ArgumentMatchers, MockedConstruction}
import org.mockito.{ArgumentCaptor, ArgumentMatchers, MockedConstruction}

import java.io.{ByteArrayInputStream, File}
import java.net.InetAddress
Expand Down Expand Up @@ -6495,7 +6495,7 @@ class ReplicaManagerTest {
}

@Test
def testAppendWithInvalidDisklessAndValidCLassic(): Unit = {
def testAppendWithInvalidDisklessAndValidClassic(): Unit = {
val entriesPerPartition = Map(
disklessTopicPartition -> RECORDS,
classicTopicPartition -> RECORDS,
Expand Down Expand Up @@ -6643,7 +6643,7 @@ class ReplicaManagerTest {
val replicaManager = try {
createReplicaManager(
List(disklessTopicPartition.topic(), disklessTopicPartition2.topic()),
controlPlane = Some(cp),
controlPlaneOption = Some(cp),
topicIdMapping = Map(disklessTopicPartition2.topic() -> disklessTopicPartition2.topicId())
)
} finally {
Expand Down Expand Up @@ -6706,7 +6706,7 @@ class ReplicaManagerTest {
val fetchHandlerCtor: MockedConstruction[FetchHandler] = mockFetchHandler(disklessResponse)

val replicaManager = try {
createReplicaManager(List(disklessTopicPartition.topic()), controlPlane = Some(cp))
createReplicaManager(List(disklessTopicPartition.topic()), controlPlaneOption = Some(cp))
} finally {
fetchHandlerCtor.close()
}
Expand Down Expand Up @@ -6765,7 +6765,7 @@ class ReplicaManagerTest {

val replicaManager = try {
// spy to inject readFromLog mock
spy(createReplicaManager(List(disklessTopicPartition.topic()), controlPlane = Some(cp)))
spy(createReplicaManager(List(disklessTopicPartition.topic()), controlPlaneOption = Some(cp)))
} finally {
fetchHandlerCtor.close()
}
Expand Down Expand Up @@ -6845,7 +6845,7 @@ class ReplicaManagerTest {

val replicaManager = try {
// spy to inject readFromLog mock
spy(createReplicaManager(List(disklessTopicPartition.topic()), controlPlane = Some(cp)))
spy(createReplicaManager(List(disklessTopicPartition.topic()), controlPlaneOption = Some(cp)))
} finally {
fetchHandlerCtor.close()
}
Expand Down Expand Up @@ -6919,7 +6919,7 @@ class ReplicaManagerTest {
val fetchHandlerCtor: MockedConstruction[FetchHandler] = mockFetchHandler(disklessResponse)

val replicaManager = try {
createReplicaManager(List(disklessTopicPartition.topic()), controlPlane = Some(cp))
createReplicaManager(List(disklessTopicPartition.topic()), controlPlaneOption = Some(cp))
} finally {
fetchHandlerCtor.close()
}
Expand Down Expand Up @@ -6982,7 +6982,7 @@ class ReplicaManagerTest {
val fetchHandlerCtor: MockedConstruction[FetchHandler] = mockFetchHandler(disklessResponse)

val replicaManager = try {
spy(createReplicaManager(List(disklessTopicPartition.topic()), controlPlane = Some(cp)))
spy(createReplicaManager(List(disklessTopicPartition.topic()), controlPlaneOption = Some(cp)))
} finally {
fetchHandlerCtor.close()
}
Expand Down Expand Up @@ -7057,7 +7057,7 @@ class ReplicaManagerTest {
val fetchHandlerCtor: MockedConstruction[FetchHandler] = mockFetchHandler(disklessResponse)

val replicaManager = try {
spy(createReplicaManager(List(disklessTopicPartition.topic()), controlPlane = Some(cp)))
spy(createReplicaManager(List(disklessTopicPartition.topic()), controlPlaneOption = Some(cp)))
} finally {
fetchHandlerCtor.close()
}
Expand Down Expand Up @@ -7130,24 +7130,25 @@ class ReplicaManagerTest {

private def createReplicaManager(
disklessTopics: Seq[String],
controlPlane: Option[ControlPlane] = None,
controlPlaneOption: Option[ControlPlane] = None,
topicIdMapping: Map[String, Uuid] = Map.empty
): ReplicaManager = {
val props = TestUtils.createBrokerConfig(1, logDirCount = 2)
val brokerId = 1
val props = TestUtils.createBrokerConfig(brokerId, logDirCount = 2)
val config = KafkaConfig.fromProps(props)
val mockLogMgr = TestUtils.createLogManager(config.logDirs.asScala.map(new File(_)), new LogConfig(new Properties()))
val sharedState = mock(classOf[SharedState], Answers.RETURNS_DEEP_STUBS)
when(sharedState.time()).thenReturn(Time.SYSTEM)
when(sharedState.config()).thenReturn(new InklessConfig(new util.HashMap[String, Object]()))
when(sharedState.controlPlane()).thenReturn(controlPlane.getOrElse(mock(classOf[ControlPlane])))
val inklessConfigProps = new util.HashMap[String, Object]()
inklessConfigProps.put(InklessConfig.CONSUME_BATCH_COORDINATE_CACHE_ENABLED_CONFIG, java.lang.Boolean.FALSE)
val inklessConfig = new InklessConfig(inklessConfigProps)
val controlPlane = controlPlaneOption.getOrElse(mock(classOf[ControlPlane]))
val inklessMetadata = mock(classOf[MetadataView])
when(inklessMetadata.isDisklessTopic(any())).thenReturn(false)
when(inklessMetadata.getTopicId(anyString())).thenAnswer{ invocation =>
val topicName = invocation.getArgument(0, classOf[String])
topicIdMapping.getOrElse(topicName, Uuid.ZERO_UUID)
}
disklessTopics.foreach(t => when(inklessMetadata.isDisklessTopic(t)).thenReturn(true))
when(sharedState.metadata()).thenReturn(inklessMetadata)
val sharedState = SharedState.initialize(time, brokerId, inklessConfig, inklessMetadata, controlPlane, new BrokerTopicStats(), () => new LogConfig(new Properties()))

val logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)

Expand Down
153 changes: 123 additions & 30 deletions storage/inkless/src/main/java/io/aiven/inkless/common/SharedState.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.storage.internals.log.LogConfig;
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;

import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.function.Supplier;

import io.aiven.inkless.cache.BatchCoordinateCache;
Expand All @@ -52,6 +54,18 @@ public final class SharedState implements Closeable {
private final InklessConfig config;
private final MetadataView metadata;
private final ControlPlane controlPlane;
private final StorageBackend fetchStorage;
// Separate storage client for lagging consumers to:
// 1. Isolate connection pool usage (lagging consumers shouldn't exhaust connections for hot path)
// 2. Allow independent tuning of timeouts/retries for cold storage access patterns
// (This requires some refactoring on how the storage client is built/configured)
private final Optional<StorageBackend> maybeLaggingFetchStorage;
private final StorageBackend produceStorage;
// backgroundStorage is shared by FileCleaner and FileMerger executors which run concurrently.
// Kafka storage backends are required to be thread-safe (they share the same Metrics instance).
// However, these tasks perform high-latency object storage calls and retries. A dedicated backend
// instance guarantees they don't contend with hot-path fetch/produce clients and prevents threading/double-close issues.
private final StorageBackend backgroundStorage;
private final ObjectKeyCreator objectKeyCreator;
private final KeyAlignmentStrategy keyAlignmentStrategy;
private final ObjectCache cache;
Expand All @@ -60,12 +74,17 @@ public final class SharedState implements Closeable {
private final Supplier<LogConfig> defaultTopicConfigs;
private final Metrics storageMetrics;

public SharedState(
private SharedState(
final Time time,
final int brokerId,
final InklessConfig config,
final MetadataView metadata,
final ControlPlane controlPlane,
final StorageBackend fetchStorage,
final Optional<StorageBackend> maybeLaggingFetchStorage,
final StorageBackend produceStorage,
final StorageBackend backgroundStorage,
final Metrics storageMetrics,
final ObjectKeyCreator objectKeyCreator,
final KeyAlignmentStrategy keyAlignmentStrategy,
final ObjectCache cache,
Expand All @@ -84,12 +103,11 @@ public SharedState(
this.batchCoordinateCache = batchCoordinateCache;
this.brokerTopicStats = brokerTopicStats;
this.defaultTopicConfigs = defaultTopicConfigs;

final MetricsReporter reporter = new JmxReporter();
this.storageMetrics = new Metrics(
new MetricConfig(), List.of(reporter), Time.SYSTEM,
new KafkaMetricsContext(STORAGE_METRIC_CONTEXT)
);
this.fetchStorage = fetchStorage;
this.maybeLaggingFetchStorage = maybeLaggingFetchStorage;
this.produceStorage = produceStorage;
this.backgroundStorage = backgroundStorage;
this.storageMetrics = storageMetrics;
}

public static SharedState initialize(
Expand All @@ -107,34 +125,91 @@ public static SharedState initialize(
"Value of consume.batch.coordinate.cache.ttl.ms exceeds file.cleaner.retention.period.ms / 2"
);
}
return new SharedState(
time,
brokerId,
config,
metadata,
controlPlane,
ObjectKey.creator(config.objectKeyPrefix(), config.objectKeyLogPrefixMasked()),
new FixedBlockAlignment(config.fetchCacheBlockBytes()),
new CaffeineCache(

CaffeineCache objectCache = null;
BatchCoordinateCache batchCoordinateCache = null;
StorageBackend fetchStorage = null;
StorageBackend laggingFetchStorage = null;
StorageBackend produceStorage = null;
StorageBackend backgroundStorage = null;
Metrics storageMetrics = null;
try {
objectCache = new CaffeineCache(
config.cacheMaxCount(),
config.cacheExpirationLifespanSec(),
config.cacheExpirationMaxIdleSec()
),
config.isBatchCoordinateCacheEnabled() ? new CaffeineBatchCoordinateCache(config.batchCoordinateCacheTtl()) : new NullBatchCoordinateCache(),
brokerTopicStats,
defaultTopicConfigs
);
);
batchCoordinateCache = config.isBatchCoordinateCacheEnabled()
? new CaffeineBatchCoordinateCache(config.batchCoordinateCacheTtl())
: new NullBatchCoordinateCache();

final MetricsReporter reporter = new JmxReporter();
storageMetrics = new Metrics(
new MetricConfig(), List.of(reporter), Time.SYSTEM,
new KafkaMetricsContext(STORAGE_METRIC_CONTEXT)
);
fetchStorage = config.storage(storageMetrics);
// If thread pool size is 0, disabling lagging consumer support, don't create a separate client
//
// NOTE: The client for lagging consumers is created only when this SharedState
// is constructed. If fetchLaggingConsumerThreadPoolSize() is 0 at this time, no separate
// client is created and lagging consumer support is effectively disabled for the lifetime
// of this instance, even if the configuration is later reloaded with a non-zero value.
// Enabling lagging consumer support therefore requires a broker restart (or reconstruction
// of the SharedState) so that a new storage client can be created.
laggingFetchStorage = config.fetchLaggingConsumerThreadPoolSize() > 0 ? config.storage(storageMetrics) : null;
produceStorage = config.storage(storageMetrics);
backgroundStorage = config.storage(storageMetrics);
final var objectKeyCreator = ObjectKey.creator(config.objectKeyPrefix(), config.objectKeyLogPrefixMasked());
final var keyAlignmentStrategy = new FixedBlockAlignment(config.fetchCacheBlockBytes());
return new SharedState(
time,
brokerId,
config,
metadata,
controlPlane,
fetchStorage,
Optional.ofNullable(laggingFetchStorage),
produceStorage,
backgroundStorage,
storageMetrics,
objectKeyCreator,
keyAlignmentStrategy,
objectCache,
batchCoordinateCache,
brokerTopicStats,
defaultTopicConfigs
);
} catch (Exception e) {
// Closing storage backends
Utils.closeQuietly(backgroundStorage, "backgroundStorage");
Utils.closeQuietly(produceStorage, "produceStorage");
Utils.closeQuietly(fetchStorage, "fetchStorage");
Utils.closeQuietly(laggingFetchStorage, "laggingFetchStorage");
// Closing storage metrics
Utils.closeQuietly(storageMetrics, "storageMetrics");
// Closing caches
Utils.closeQuietly(batchCoordinateCache, "batchCoordinateCache");
Utils.closeQuietly(objectCache, "objectCache");

throw new RuntimeException("Failed to initialize SharedState", e);
}
}

@Override
public void close() throws IOException {
try {
cache.close();
controlPlane.close();
storageMetrics.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
// Closing storage backends
Utils.closeQuietly(backgroundStorage, "backgroundStorage");
Utils.closeQuietly(produceStorage, "produceStorage");
maybeLaggingFetchStorage.ifPresent(s -> Utils.closeQuietly(s, "laggingFetchStorage"));
Utils.closeQuietly(fetchStorage, "fetchStorage");
// Closing storage metrics
Utils.closeQuietly(storageMetrics, "storageMetrics");
// Closing caches
Utils.closeQuietly(cache, "objectCache");
Utils.closeQuietly(batchCoordinateCache, "batchCoordinateCache");
// Closing control plane
Utils.closeQuietly(controlPlane, "controlPlane");
}

public Time time() {
Expand Down Expand Up @@ -185,7 +260,25 @@ public Supplier<LogConfig> defaultTopicConfigs() {
return defaultTopicConfigs;
}

public StorageBackend buildStorage() {
return config.storage(storageMetrics);
public StorageBackend fetchStorage() {
return fetchStorage;
}

/**
* Optional access to the lagging fetch storage backend.
*
* <p>When {@code fetch.lagging.consumer.thread.pool.size == 0}, the lagging consumer
* path is explicitly disabled and this storage backend is not created.</p>
*/
public Optional<StorageBackend> maybeLaggingFetchStorage() {
return maybeLaggingFetchStorage;
}

public StorageBackend produceStorage() {
return produceStorage;
}

public StorageBackend backgroundStorage() {
return backgroundStorage;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,23 +51,11 @@ public FetchHandler(final SharedState state) {
state.keyAlignmentStrategy(),
state.cache(),
state.controlPlane(),
state.buildStorage(),
state.fetchStorage(),
state.brokerTopicStats(),
state.config().fetchMetadataThreadPoolSize(),
state.config().fetchDataThreadPoolSize(),
// Separate storage client for lagging consumers to:
// 1. Isolate connection pool usage (lagging consumers shouldn't exhaust connections for hot path)
// 2. Allow independent tuning of timeouts/retries for cold storage access patterns
// (This requires some refactoring on how the storage client is built/configured)
// If thread pool size is 0, disabling lagging consumer support, don't create a separate client
//
// NOTE: The client for lagging consumers is created only when this FetchHandler (and Reader)
// is constructed. If fetchLaggingConsumerThreadPoolSize() is 0 at this time, no separate
// client is created and lagging consumer support is effectively disabled for the lifetime
// of this instance, even if the configuration is later reloaded with a non-zero value.
// Enabling lagging consumer support therefore requires a broker restart (or reconstruction
// of the SharedState/FetchHandler) so that a new storage client can be created.
state.config().fetchLaggingConsumerThreadPoolSize() > 0 ? state.buildStorage() : null,
state.maybeLaggingFetchStorage(),
state.config().fetchLaggingConsumerThresholdMs(),
state.config().fetchLaggingConsumerRequestRateLimit(),
state.config().fetchLaggingConsumerThreadPoolSize(),
Expand All @@ -76,7 +64,8 @@ public FetchHandler(final SharedState state) {
);
}

public FetchHandler(final Reader reader) {
// Visible for testing
FetchHandler(final Reader reader) {
this.reader = reader;
}

Expand Down
Loading
Loading