Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache segment metadata on the Overlord to speed up segment allocation and other task actions #17653

Open
wants to merge 26 commits into
base: master
Choose a base branch
from

Conversation

kfaraz
Copy link
Contributor

@kfaraz kfaraz commented Jan 22, 2025

Description

The Overlord performs several metadata operations on the tables druid_segments and druid_pendingSegments,
such as segment commit, allocation, upgrade and mark used / unused.

Segment allocation, in particular, involves several reads/writes to the metadata store and can often become a bottleneck,
causing ingestion to slow down. This effect is particularly pronounced when streaming ingestion is enabled for
multiple datasources or if there is a lot of late arriving data.

This patch adds an in-memory segment cache to the Overlord to speed up all segment metadata operations.

Assumptions

Design

Summary

  • Add Overlord runtime property druid.manager.segments.useCache to enable cache
  • Keep cache disabled by default
  • When cache is enabled, read metadata from cache and write metadata to both metadata store and cache
  • Poll metadata store periodically in case any metadata update did not make it to the cache (should not happen under stable operational conditions)
  • Non-leader Overlords also poll the metadata store and keep the cache ready in case of failover
  • Upon becoming leader, Overlord needs to finish one poll successfully before cache can be used in transactions

Segment metadata transaction with cache enabled

  • If cache is not enabled, fall back to old flow
  • If not leader, do not proceed
  • If cache is waiting for sync to finish after becoming leader, block until cache is ready
  • Acquire a lock on cache to ensure that another thread does not update it while we are reading from it.
  • Start transaction
  • Get leader term
  • Perform computations
  • For every read, just read from the cache
  • For every write
    • check if leadership has been lost or term has changed
    • this helps safeguard against cases where we lose leadership during the transaction
    • if yes, rollback transaction
    • if not, remember write action to commit to cache later
  • If transaction has succeeded, commit pending writes to cache
  • Close transaction
  • Release lock

Lifecycle of cache

  • Cache is enabled if druid.manager.segments.useCache=true on the Overlord
  • When Overlord starts, start() cache is called putting it in STANDBY mode.
  • In STANDBY mode, the cache polls the metadata store at a period of druid.manager.segments.pollDuration to do the following:
    • Retrieve all segment IDs and their last updated timestamps
    • If the cache has stale or no information for any unused segment, update it
    • If the cache has stale or no information for any used segment, fetch entire segment payload
    • Retrieve all pending segments and update cache if it has stale information
  • Cache cannot be used for transactions while it is in STANDBY mode.
  • When Overlord becomes leader, cache moves to SYNC_PENDING mode.
  • When the next poll starts, it moves the cache from SYNC_PENDING to SYNC_STARTED
  • When this poll completes, cache is marked as READY
  • In READY mode, cache can be used for read and write transactions
  • When Overlord loses leadership, cache is moved back to STANDBY mode
  • When Overlord stops, stop() cache is called stopping the poll

Contents of cache

The cache maintains the following fields for every datasource.

Field Needed In
Map<String, DataSegmentPlus> idToUsedSegment Reads/writes for used segments
Set<String> unusedSegmentIds Checking set of existing segment IDs to avoid duplicate insert
Map<Interval, Map<String, Integer>> intervalVersionToHighestUnusedPartitionNumber Segment allocation to avoid duplicate IDs
Map<Interval, Map<String, PendingSegmentRecord>> intervalToPendingSegments Segment allocation

Code changes

Class / Description
SegmentsMetadataManagerConfig.useCache
  • Enable/disable cache on the Overlord
DatasourceSegmentMetadataReader
  • Interface to perform segment metadata read operations
DatasourceSegmentMetadataWriter
  • Interface to perform segment metadata write operations
HeapMemorySegmentMetadataCache
  • Poll committed and pending segments from the metadata store
DatasourceSegmentCache
  • Cache committed and pending segments of a single datasource
SegmentMetadataTransaction
  • Encapsulate all read/write operations performed within a transaction.
  • This abstraction allows the code to redirect all read/write operations within a transaction to either the cache or to the metadata store itself
SqlSegmentMetadataTransaction
  • Perform read/writes directly on metadata store if cache is disabled or not ready
CachedSegmentMetadataTransaction
  • Perform read only from cache and writes to both metadata store and cache
SqlSegmentMetadataTransactionFactory
  • Create transaction based on state of cache
IndexerSQLMetadataStorageCoordinator
  • Perform all metadata transactions using transaction factory
  • Move metadata reads methods to SqlSegmentsMetadataQuery
  • Move metadata write methods to SqlSegmentMetadataTransaction

Testing

  • Run all ITs successfully with cache enabled in this commit
  • Update existing tests to run both with and without cache
    • IndexerSQLMetadataStorageCoordinatorTest
    • SegmentAllocateActionTest
    • SegmentAllocationQueueTest
  • Add DatasourceSegmentCacheTest

Pending items

  • Cluster testing

Release note

Add Overlord runtime property druid.manager.segments.useCache (default value false).
Set this to true to turn on segment metadata caching on the Overlord. This allows segment metadata operations
such as reads and segment allocation to be sped up significantly.

Upgrade notes

The flag druid.manager.segments.useCache to enable the segment cache should be turned on only when
Druid has been upgraded to a version containing both this patch #17653 and #17545 .
When Druid is being downgraded to an older, the feature flag must first be turned off.


This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@kfaraz kfaraz marked this pull request as ready for review January 28, 2025 12:34
// Assume that the metadata write operation succeeded
// Do not update the cache just yet, add to the list of pending writes
pendingCacheWrites.add(writer -> {
T ignored = action.apply(writer);

Check notice

Code scanning / CodeQL

Unread local variable Note

Variable 'T ignored' is never read.
@AmatyaAvadhanula
Copy link
Contributor

@kfaraz thank you for the changes.

Could you please call out the dependency on #17545 in the description?
I also think that it is important to turn off this feature flag prior to downgrades. (because it is potentially dangerous when the Coordinator is downgraded to Druid version <= 31.0.1 and the Overlord is still on version >= 33.x.y with this config turned on for a brief period.)

@kfaraz
Copy link
Contributor Author

kfaraz commented Feb 5, 2025

Thanks for the suggestion, @AmatyaAvadhanula . I have added an Upgrade Notes section.

Copy link
Contributor

@AmatyaAvadhanula AmatyaAvadhanula left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a couple of questions about certain choices, which I have left as comments.
I was also hoping to understand the pros and cons of having pending segments and segments being accessed by the same ReadWriteLock in the cache.

Otherwise, the segment allocation changes look good to me.

Will try to wrap up the review of the caching mechanism soon.


HeapMemoryDatasourceSegmentCache(String dataSource)
{
super(true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add more details explaining the usage of a fair lock?
I recall a discussion where we wanted to change the lock in VersionedIntervalTimeline to false as well.

Copy link
Contributor Author

@kfaraz kfaraz Feb 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will add that in the javadoc.

Edit: I haven't really given much thought to whether we should go with fair or not for the cache. Reading through the javadoc of ReentrantReadWriteLock, fair had seemed an appropriate choice. But I will take another look and document whatever we decide to do.

/**
* Not being used right now. Could allow lookup of visible segments for a given interval.
*/
private final SegmentTimeline usedSegmentTimeline = SegmentTimeline.forSegments(Set.of());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding a timeline with potentially expensive operations because of its own lock seems risky and wasteful given that we are not using it.

Is there a reason we aren't using a TreeMap: Interval -> (DataSegment / SegmentId) and use it instead?
I believe the perf impact would be significant when there are several intervals and segments.
We are creating a Timeline after fetching segments anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the timeline is not really being used in the code right now. I will just get rid of it for the time being.

Copy link
Contributor Author

@kfaraz kfaraz Feb 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason we aren't using a TreeMap: Interval -> (DataSegment / SegmentId) and use it instead?
I believe the perf impact would be significant when there are several intervals and segments.

Hmm, let me evaluate this once. I had decided against it originally since most segment searches are for overlapping intervals rather than exact matches. But I guess we can have some additional logic to prune out intervals which are disjoint, thus benefiting perf as you point out.

Other searches are by segment ID, so keeping an id -> segment map helped.

Eventually, we would most likely end up keeping a timeline itself.
That timeline could also be used to replace the timeline maintained in SqlSegmentsMetadataManager used by CompactionScheduler (and also the coordinator).

@kfaraz
Copy link
Contributor Author

kfaraz commented Feb 6, 2025

I was also hoping to understand the pros and cons of having pending segments and segments being accessed by the same ReadWriteLock in the cache.

There are certain transactions that modify both pending segments and regular segments.
To maintain consistency (and also for general simplicity), it is best to have a single lock that restricts access to the entire cache.
The locks are only held at datasource level and the cache operations are expected to be fast enough so that this won't be a concern.

@gianm
Copy link
Contributor

gianm commented Feb 6, 2025

Handle race conditions

@kfaraz this is listed under "pending items". What race conditions are you aware of?

@kfaraz
Copy link
Contributor Author

kfaraz commented Feb 7, 2025

@kfaraz this is listed under "pending items". What race conditions are you aware of?

Thanks for calling this out, @gianm .
There were some race conditions that I had identified but I have handled them now. I have updated the PR description accordingly. For the race conditions and corner cases, there are also relevant comments/javadocs in the code.

  • Races between two different transactions initiated by IndexerSQLMetadataStorageCoordinator. The intial design did not acquire a lock on the cache for the entirety of a transaction, but this has been fixed now. (see SqlSegmentMetadataTransactionFactory)
  • Race between polling from metadata store and writing to cache. The polling thread could try to remove a segment ID from cache if it was not found in the latest poll from metadata store. But this logic has been modified to remove only the segments which were last updated before the poll started, so that we don't remove something which has just been added to cache (and thus is not included in the latest poll results).
  • Other minor stuff found during unit testing, which has already been handled.

I will take another pass through the code, just to ensure that I haven't missed anything, adding comments where necessary.

@kfaraz
Copy link
Contributor Author

kfaraz commented Feb 7, 2025

@AmatyaAvadhanula , thanks for the suggestions! I have removed the timeline and added an interval map instead.

@gianm
Copy link
Contributor

gianm commented Feb 7, 2025

There were some race conditions that I had identified but I have handled them now. I have updated the PR description accordingly. For the race conditions and corner cases, there are also relevant comments/javadocs in the code.

Thanks for the notes!

segmentAllocationQueue.becomeLeader();
taskMaster.becomeHalfLeader(taskRunner, taskQueue);
}

@Override
public void stop()
{
segmentMetadataCache.stopBeingLeader();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally, the order of items in stop() should be the reverse of the order in start(), in case there are dependencies.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

@@ -228,6 +230,10 @@ public void configure(Binder binder)
JsonConfigProvider.bind(binder, "druid.indexer.task.default", DefaultTaskConfig.class);
binder.bind(RetryPolicyFactory.class).in(LazySingleton.class);

binder.bind(SegmentMetadataCache.class)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this meant to override the binding to NoopSegmentMetadataCache in SQLMetadataStorageDruidModule? I thought multiple bindings typically weren't allowed.

I wonder why we need the binding outside of CliOverlord at all- why do other server types need to create one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, the other server types shouldn't need the binding at all.
But we have this dep graph:
CoreInjectorBuilder -> DerbyMetadataStorageModule -> SQLMetadataStorageModule -> SqlSegmentMetadataTransactionFactory (required for IndexerSQLMetadataStorageCoordinator) -> SegmentMetadataCache.

CoreInjectorBuilder should not even load DerbyMetadataStorageModule, it should be loaded only in CliOverlord and CliCoordinator. But I decided to make this change in a separate PR so that I could test it out properly, just to be on the safe side.

Please let me know if you think I should include it in this PR itself.

{
return pollDuration;
this.pollDuration = Configs.valueOrDefault(pollDuration, Period.minutes(1));
this.useCache = Configs.valueOrDefault(useCache, true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is what controls whether the new caching feature is enabled, then want this to be false by default for now. We could change the default to true when it's more proven out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I had enabled this temporarily to have all UTs and ITs work with the cache enabled.
I will disable it now as all tests seem to work as expected.
There are already some tests which run in both modes: cache enabled and disabled.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, default is now false (disabled).

int maxPartitionNum = -1;
for (String id : unusedSegmentIds) {
final SegmentId segmentId = SegmentId.tryParse(datasource, id);
if (segmentId == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Warn (or perhaps even throw?) if the segment ID is unparseable, since in that case, the method may not be returning the correct answer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, throws exception now.

for (List<String> partition : partitionedSegmentIds) {
fetchedSegments.addAll(retrieveSegmentBatchById(datasource, partition, false));
fetchedSegments.add(retrieveSegmentBatchById(datasource, partition, false));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder what this code will do exactly. There will be multiple CloseableIterator from retrieveSegmentBatchById existing at once. What effect does that have?

Does the metadata query get made lazily when the iterator first has hasNext() called? If so then it would lead to the metadata queries being issued sequentially, which seems fine. But, if the query is issued as part of iterator creation, this would lead to quite a lot of simultaneously open queries, which might cause problems with the metadata store.

Copy link
Contributor Author

@kfaraz kfaraz Feb 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion! Added a CloseableIterator which keeps only one result set open at a time.

while (currentCacheState == CacheState.LEADER_FIRST_SYNC_PENDING
|| currentCacheState == CacheState.LEADER_FIRST_SYNC_STARTED) {
try {
cacheStateLock.wait(5 * 60_000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use a static constant please. Btw, if the intent here is to have a specific timeout on waiting for sync, it should be checked again after wait returns (and then continue to wait if the timeout hasn't been reached yet). It is possible for wait to return early in case of spurious wakeup.

To ensure the thread wakes up timely, all of the cache state transitions should include a notifyAll.

Copy link
Contributor Author

@kfaraz kfaraz Feb 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the intent is just to avoid waiting forever.
The spurious wakeup is handled by verifyCacheIsReady (renamed to verifyCacheIsUsableAndAwaitSync()) itself.

cacheStateLock.wait(5 * 60_000);
}
catch (Exception e) {
log.noStackTrace().error(e, "Error while waiting for cache to be ready");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the only error wait will throw during normal operation is InterruptedException. Consider special-casing that, and logging at a lower level. Other exception types can continue to be logged at error level and re-thrown.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added catch for InterruptedException.

}
catch (Throwable t) {
log.error(t, "Error occurred while polling metadata store");
log.makeAlert(t, "Error occurred while polling metadata store");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing call to emit() after makeAlert. Also, no reason to call both log.error and log.makeAlert. The alert is logged when it is emitted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching this!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

throw DruidException.defensive("Cache has not been started yet");
}

currentCacheState = CacheState.LEADER_FIRST_SYNC_PENDING;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interrupt and re-start the current sync? That could help the leader gain leadership faster.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

* <li>Emit metrics</li>
* </ul>
*/
private void syncWithMetadataStore()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you been able to benchmark this method with a cluster with lots of segments (e.g. millions)? It will need to complete before allocation can work post-leadership-election, so I'm wondering how much time that will take.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not yet. I am about to start testing on some clusters with a large number of segments. I will share the results here once the testing is done.

@kfaraz
Copy link
Contributor Author

kfaraz commented Feb 9, 2025

@gianm , thanks for the review!
I have updated the PR based on your feedback.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants