Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] [Tiered Caching] Bug fix for IndicesRequestCache StaleKey management #13391

Merged
merged 1 commit into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Enabled mockTelemetryPlugin for IT and fixed OOM issues ([#13054](https://github.com/opensearch-project/OpenSearch/pull/13054))
- Fix from and size parameter can be negative when searching ([#13047](https://github.com/opensearch-project/OpenSearch/pull/13047))
- Fix implement mark() and markSupported() in class FilterStreamInput ([#13098](https://github.com/opensearch-project/OpenSearch/pull/13098))
- Fix IndicesRequestCache Stale calculation ([#13070](https://github.com/opensearch-project/OpenSearch/pull/13070)]
- Fix snapshot _status API to return correct status for partial snapshots ([#12812](https://github.com/opensearch-project/OpenSearch/pull/12812))
- Improve the error messages for _stats with closed indices ([#13012](https://github.com/opensearch-project/OpenSearch/pull/13012))
- Ignore BaseRestHandler unconsumed content check as it's always consumed. ([#13290](https://github.com/opensearch-project/OpenSearch/pull/13290))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.opensearch.common.cache.LoadAwareCacheLoader;
import org.opensearch.common.cache.RemovalListener;
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.RemovalReason;
import org.opensearch.common.cache.policy.CachedQueryResult;
import org.opensearch.common.cache.serializer.BytesReferenceSerializer;
import org.opensearch.common.cache.service.CacheService;
Expand Down Expand Up @@ -216,19 +217,16 @@ void clear(CacheEntity entity) {
public void onRemoval(RemovalNotification<ICacheKey<Key>, BytesReference> notification) {
// In case this event happens for an old shard, we can safely ignore this as we don't keep track for old
// shards as part of request cache.

// Pass a new removal notification containing Key rather than ICacheKey<Key> to the CacheEntity for backwards compatibility.
Key key = notification.getKey().key;
RemovalNotification<Key, BytesReference> newNotification = new RemovalNotification<>(
key,
notification.getValue(),
notification.getRemovalReason()
);

cacheEntityLookup.apply(key.shardId).ifPresent(entity -> entity.onRemoval(newNotification));
cacheCleanupManager.updateCleanupKeyToCountMapOnCacheEviction(
new CleanupKey(cacheEntityLookup.apply(key.shardId).orElse(null), key.readerCacheKeyId)
);
CleanupKey cleanupKey = new CleanupKey(cacheEntityLookup.apply(key.shardId).orElse(null), key.readerCacheKeyId);
cacheCleanupManager.updateStaleCountOnEntryRemoval(cleanupKey, newNotification);
}

private ICacheKey<Key> getICacheKey(Key key) {
Expand Down Expand Up @@ -272,10 +270,11 @@ BytesReference getOrCompute(
OpenSearchDirectoryReader.addReaderCloseListener(reader, cleanupKey);
}
}
cacheCleanupManager.updateCleanupKeyToCountMapOnCacheInsertion(cleanupKey);
cacheCleanupManager.updateStaleCountOnCacheInsert(cleanupKey);
} else {
cacheEntity.onHit();
}

return value;
}

Expand Down Expand Up @@ -508,7 +507,7 @@ void enqueueCleanupKey(CleanupKey cleanupKey) {
*
* @param cleanupKey the CleanupKey to be updated in the map
*/
private void updateCleanupKeyToCountMapOnCacheInsertion(CleanupKey cleanupKey) {
private void updateStaleCountOnCacheInsert(CleanupKey cleanupKey) {
if (stalenessThreshold == 0.0 || cleanupKey.entity == null) {
return;
}
Expand All @@ -524,8 +523,29 @@ private void updateCleanupKeyToCountMapOnCacheInsertion(CleanupKey cleanupKey) {
cleanupKeyToCountMap.computeIfAbsent(shardId, k -> new HashMap<>()).merge(cleanupKey.readerCacheKeyId, 1, Integer::sum);
}

private void updateCleanupKeyToCountMapOnCacheEviction(CleanupKey cleanupKey) {
if (stalenessThreshold == 0.0 || cleanupKey.entity == null) {
/**
* Handles the eviction of a cache entry.
*
* <p>This method is called when an entry is evicted from the cache.
* We consider all removal notifications except with the reason Replaced
* {@link #incrementStaleKeysCount} would have removed the entries from the map and increment the {@link #staleKeysCount}
* Hence we decrement {@link #staleKeysCount} if we do not find the shardId or readerCacheKeyId in the map.
* Skip decrementing staleKeysCount if we find the shardId or readerCacheKeyId in the map since it would have not been accounted for in the staleKeysCount in
*
* @param cleanupKey the CleanupKey that has been evicted from the cache
* @param notification RemovalNotification of the cache entry evicted
*/
private void updateStaleCountOnEntryRemoval(CleanupKey cleanupKey, RemovalNotification<Key, BytesReference> notification) {
if (notification.getRemovalReason() == RemovalReason.REPLACED) {
// The reason of the notification is REPLACED when a cache entry's value is updated, since replacing an entry
// does not affect the staleness count, we skip such notifications.
return;
}
if (cleanupKey.entity == null) {
// entity will only be null when the shard is closed/deleted
// we would have accounted this in staleKeysCount when the closing/deletion of shard would have closed the associated
// readers
staleKeysCount.decrementAndGet();
return;
}
IndexShard indexShard = (IndexShard) cleanupKey.entity.getCacheIdentity();
Expand All @@ -535,23 +555,41 @@ private void updateCleanupKeyToCountMapOnCacheEviction(CleanupKey cleanupKey) {
}
ShardId shardId = indexShard.shardId();

cleanupKeyToCountMap.computeIfPresent(shardId, (shard, keyCountMap) -> {
keyCountMap.computeIfPresent(cleanupKey.readerCacheKeyId, (key, currentValue) -> {
// decrement the stale key count
cleanupKeyToCountMap.compute(shardId, (key, readerCacheKeyMap) -> {
if (readerCacheKeyMap == null || !readerCacheKeyMap.containsKey(cleanupKey.readerCacheKeyId)) {
// If ShardId is not present or readerCacheKeyId is not present
// it should have already been accounted for and hence been removed from this map
// so decrement staleKeysCount
staleKeysCount.decrementAndGet();
int newValue = currentValue - 1;
// Remove the key if the new value is zero by returning null; otherwise, update with the new value.
return newValue == 0 ? null : newValue;
});
return keyCountMap;
// Return the current map
return readerCacheKeyMap;
} else {
// If it is in the map, it is not stale yet.
// Proceed to adjust the count for the readerCacheKeyId in the map
// but do not decrement the staleKeysCount
Integer count = readerCacheKeyMap.get(cleanupKey.readerCacheKeyId);
// this should never be null
assert (count != null && count >= 0);
// Reduce the count by 1
int newCount = count - 1;
if (newCount > 0) {
// Update the map with the new count
readerCacheKeyMap.put(cleanupKey.readerCacheKeyId, newCount);
} else {
// Remove the readerCacheKeyId entry if new count is zero
readerCacheKeyMap.remove(cleanupKey.readerCacheKeyId);
}
// If after modification, the readerCacheKeyMap is empty, we return null to remove the ShardId entry
return readerCacheKeyMap.isEmpty() ? null : readerCacheKeyMap;
}
});
}

/**
* Updates the count of stale keys in the cache.
* This method is called when a CleanupKey is added to the keysToClean set.
*
* It increments the staleKeysCount by the count of the CleanupKey in the cleanupKeyToCountMap.
* <p>It increments the staleKeysCount by the count of the CleanupKey in the cleanupKeyToCountMap.
* If the CleanupKey's readerCacheKeyId is null or the CleanupKey's entity is not open, it increments the staleKeysCount
* by the total count of keys associated with the CleanupKey's ShardId in the cleanupKeyToCountMap and removes the ShardId from the map.
*
Expand All @@ -569,7 +607,7 @@ private void incrementStaleKeysCount(CleanupKey cleanupKey) {
ShardId shardId = indexShard.shardId();

// Using computeIfPresent to atomically operate on the countMap for a given shardId
cleanupKeyToCountMap.computeIfPresent(shardId, (key, countMap) -> {
cleanupKeyToCountMap.computeIfPresent(shardId, (currentShardId, countMap) -> {
if (cleanupKey.readerCacheKeyId == null) {
// Aggregate and add to staleKeysCount atomically if readerCacheKeyId is null
int totalSum = countMap.values().stream().mapToInt(Integer::intValue).sum();
Expand All @@ -578,18 +616,19 @@ private void incrementStaleKeysCount(CleanupKey cleanupKey) {
return null;
} else {
// Update staleKeysCount based on specific readerCacheKeyId, then remove it from the countMap
countMap.computeIfPresent(cleanupKey.readerCacheKeyId, (k, v) -> {
staleKeysCount.addAndGet(v);
countMap.computeIfPresent(cleanupKey.readerCacheKeyId, (readerCacheKey, count) -> {
staleKeysCount.addAndGet(count);
// Return null to remove the key after updating staleKeysCount
return null;
});

// Check if countMap is empty after removal to decide if we need to remove the shardId entry
if (countMap.isEmpty()) {
return null; // Returning null removes the entry for shardId
// Returning null removes the entry for shardId
return null;
}
}
return countMap; // Return the modified countMap to keep the mapping
// Return the modified countMap to retain updates
return countMap;
});
}

Expand Down Expand Up @@ -715,6 +754,11 @@ public void close() {
this.cacheCleaner.close();
}

// for testing
ConcurrentMap<ShardId, HashMap<String, Integer>> getCleanupKeyToCountMap() {
return cleanupKeyToCountMap;
}

private final class IndicesRequestCacheCleaner implements Runnable, Releasable {

private final IndicesRequestCacheCleanupManager cacheCleanupManager;
Expand Down
Loading
Loading