From bc165712fd3b2febc37793fccf457bd544a777f7 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Wed, 30 Jul 2025 15:24:39 +0200 Subject: [PATCH 1/4] feat(query-ocherator): QueryCache - improve usage of in-memory cache --- .../src/orchestrator/QueryCache.ts | 83 ++++++++++--------- 1 file changed, 46 insertions(+), 37 deletions(-) diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts index 9159cdc855713..8146864210f74 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts @@ -95,6 +95,37 @@ export type CacheKey = [CacheKeyItem, CacheKeyItem, CacheKeyItem] | [CacheKeyItem, CacheKeyItem, CacheKeyItem, CacheKeyItem]; +export type QueryWithRetryAndReleaseOptions = { + cacheKey: CacheKey; + dataSource: string; + external: boolean; + priority?: number; + requestId?: string; + spanId?: string; + inlineTables?: InlineTables; + useCsvQuery?: boolean; + lambdaTypes?: TableStructure; + persistent?: boolean; + aliasNameToMember?: { [alias: string]: string }; +}; + +export type CacheQueryResultOptions = { + dataSource: string; + renewalThreshold?: number; + renewalKey?: any; + priority?: number; + external?: boolean; + requestId?: string; + waitForRenew?: boolean; + forceNoCache?: boolean; + useInMemory?: boolean; + useCsvQuery?: boolean; + lambdaTypes?: TableStructure; + persistent?: boolean; + primaryQuery?: boolean; + renewCycle?: boolean; +}; + type CacheEntry = { time: number; result: any; @@ -153,7 +184,9 @@ export class QueryCache { } this.memoryCache = new LRUCache({ - max: options.maxInMemoryCacheEntries || 10000 + max: options.maxInMemoryCacheEntries || 10000, + allowStale: false, + updateAgeOnGet: false, }); } @@ -416,25 +449,13 @@ export class QueryCache { lambdaTypes, persistent, aliasNameToMember, - }: { - cacheKey: CacheKey, - dataSource: string, - external: boolean, - priority?: number, - requestId?: string, - spanId?: string, - inlineTables?: InlineTables, - useCsvQuery?: boolean, - lambdaTypes?: TableStructure, - persistent?: boolean, - aliasNameToMember?: { [alias: string]: string }, - } + }: QueryWithRetryAndReleaseOptions ) { const queue = external ? this.getExternalQueue() : await this.getQueue(dataSource); - const _query = { + const queryDef = { queryKey: cacheKey, query, values, @@ -442,6 +463,8 @@ export class QueryCache { inlineTables, useCsvQuery, lambdaTypes, + // Used only for streaming + aliasNameToMember }; const opt = { @@ -451,12 +474,9 @@ export class QueryCache { }; if (!persistent) { - return queue.executeInQueue('query', cacheKey as QueryKey, _query, priority, opt); + return queue.executeInQueue('query', cacheKey as QueryKey, queryDef, priority, opt); } else { - return queue.executeInQueue('stream', cacheKey as QueryKey, { - ..._query, - aliasNameToMember, - }, priority, opt); + return queue.executeInQueue('stream', cacheKey as QueryKey, queryDef, priority, opt); } } @@ -842,27 +862,13 @@ export class QueryCache { values: string[], cacheKey: CacheKey, expiration: number, - options: { - renewalThreshold?: number, - renewalKey?: any, - priority?: number, - external?: boolean, - requestId?: string, - dataSource: string, - waitForRenew?: boolean, - forceNoCache?: boolean, - useInMemory?: boolean, - useCsvQuery?: boolean, - lambdaTypes?: TableStructure, - persistent?: boolean, - primaryQuery?: boolean, - renewCycle?: boolean, - } + options: CacheQueryResultOptions ) { const spanId = crypto.randomBytes(16).toString('hex'); - options = options || { dataSource: 'default' }; + const { renewalThreshold, primaryQuery, renewCycle } = options; const renewalKey = options.renewalKey && this.queryRedisKey(options.renewalKey); + const redisKey = this.queryRedisKey(cacheKey); const fetchNew = () => ( this.queryWithRetryAndRelease(query, values, { @@ -999,10 +1005,13 @@ export class QueryCache { }); } } + this.logger('Using cache for', { cacheKey, requestId: options.requestId, spanId, primaryQuery, renewCycle }); + if (options.useInMemory && renewedAgo + inMemoryCacheDisablePeriod <= renewalThreshold * 1000) { this.memoryCache.set(redisKey, parsedResult); } + return parsedResult.result; } else { this.logger('Missing cache for', { cacheKey, requestId: options.requestId, spanId, primaryQuery, renewCycle }); From 3689e596226cc87f31ad3ce775a36d4d274b46bd Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Wed, 30 Jul 2025 16:09:43 +0200 Subject: [PATCH 2/4] chore: refactor --- CLAUDE.md | 2 +- .../src/orchestrator/QueryCache.ts | 141 ++++++++++++------ 2 files changed, 93 insertions(+), 50 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index 9a4f718d7bd33..79884aceba708 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -13,7 +13,7 @@ Cube is a semantic layer for building data applications. This is a monorepo cont ## Development Commands -**Note: This project uses Yarn as the package manager.** +**Note: This project uses Yarn as the package manager. Node.js v22.15.0 is recommended.** ### Core Build Commands ```bash diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts index 8146864210f74..9d792dd9d920a 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts @@ -132,6 +132,17 @@ type CacheEntry = { renewalKey: string; }; +type CheckCacheOptions = { + renewalKey: string; + renewalThreshold: number; + expiration: number; + options: CacheQueryResultOptions; + spanId: string; + cacheKey: CacheKey; + primaryQuery: boolean; + renewCycle: boolean; +}; + export interface QueryCacheOptions { refreshKeyRenewalThreshold?: number; externalQueueOptions?: any; @@ -857,6 +868,66 @@ export class QueryCache { callback: () => MaybeCancelablePromise, ) => this.cacheDriver.withLock(`lock:${key}`, callback, ttl, true); + protected async checkInCache( + redisKey: string, + opts: CheckCacheOptions + ): Promise { + // First check in-memory cache if enabled + if (opts.options.useInMemory) { + const inMemoryResult = this.checkInMemoryCache(redisKey, opts); + + if (inMemoryResult) { + return inMemoryResult; + } + } + + // If not found in memory, check cache driver + return this.cacheDriver.get(redisKey); + } + + protected checkInMemoryCache( + redisKey: string, + opts: CheckCacheOptions + ): any { + const inMemoryCacheDisablePeriod = 5 * 60 * 1000; + + const inMemoryValue = this.memoryCache.get(redisKey); + if (!inMemoryValue) { + return null; + } + + const renewedAgo = (new Date()).getTime() - inMemoryValue.time; + + if ( + opts.renewalKey && ( + !opts.renewalThreshold || + !inMemoryValue.time || + // Do not cache in memory in last 5 minutes of expiry. + // Most likely it'll cause race condition of refreshing data with different refreshKey values. + renewedAgo + inMemoryCacheDisablePeriod > opts.renewalThreshold * 1000 || + inMemoryValue.renewalKey !== opts.renewalKey + ) || renewedAgo > opts.expiration * 1000 || renewedAgo > inMemoryCacheDisablePeriod + ) { + this.memoryCache.delete(redisKey); + return null; + } + + this.logger('Found in memory cache entry', { + cacheKey: opts.cacheKey, + time: inMemoryValue.time, + renewedAgo, + renewalKey: inMemoryValue.renewalKey, + newRenewalKey: opts.renewalKey, + renewalThreshold: opts.renewalThreshold, + requestId: opts.options.requestId, + spanId: opts.spanId, + primaryQuery: opts.primaryQuery, + renewCycle: opts.renewCycle + }); + + return inMemoryValue; + } + public async cacheQueryResult( query: string | QueryWithParams, values: string[], @@ -928,56 +999,27 @@ export class QueryCache { return fetchNew(); } - let res; - - const inMemoryCacheDisablePeriod = 5 * 60 * 1000; - - if (options.useInMemory) { - const inMemoryValue = this.memoryCache.get(redisKey); - if (inMemoryValue) { - const renewedAgo = (new Date()).getTime() - inMemoryValue.time; - - if ( - renewalKey && ( - !renewalThreshold || - !inMemoryValue.time || - // Do not cache in memory in last 5 minutes of expiry. - // Most likely it'll cause race condition of refreshing data with different refreshKey values. - renewedAgo + inMemoryCacheDisablePeriod > renewalThreshold * 1000 || - inMemoryValue.renewalKey !== renewalKey - ) || renewedAgo > expiration * 1000 || renewedAgo > inMemoryCacheDisablePeriod - ) { - this.memoryCache.delete(redisKey); - } else { - this.logger('Found in memory cache entry', { - cacheKey, - time: inMemoryValue.time, - renewedAgo, - renewalKey: inMemoryValue.renewalKey, - newRenewalKey: renewalKey, - renewalThreshold, - requestId: options.requestId, - spanId, - primaryQuery, - renewCycle - }); - res = inMemoryValue; - } + const cachedResult = await this.checkInCache( + redisKey, + { + renewalKey, + renewalThreshold, + expiration, + options, + spanId, + cacheKey, + primaryQuery, + renewCycle } - } - - if (!res) { - res = await this.cacheDriver.get(redisKey); - } + ); - if (res) { - const parsedResult = res; - const renewedAgo = (new Date()).getTime() - parsedResult.time; + if (cachedResult) { + const renewedAgo = (new Date()).getTime() - cachedResult.time; this.logger('Found cache entry', { cacheKey, - time: parsedResult.time, + time: cachedResult.time, renewedAgo, - renewalKey: parsedResult.renewalKey, + renewalKey: cachedResult.renewalKey, newRenewalKey: renewalKey, renewalThreshold, requestId: options.requestId, @@ -988,9 +1030,9 @@ export class QueryCache { if ( renewalKey && ( !renewalThreshold || - !parsedResult.time || + !cachedResult.time || renewedAgo > renewalThreshold * 1000 || - parsedResult.renewalKey !== renewalKey + cachedResult.renewalKey !== renewalKey ) ) { if (options.waitForRenew) { @@ -1008,11 +1050,12 @@ export class QueryCache { this.logger('Using cache for', { cacheKey, requestId: options.requestId, spanId, primaryQuery, renewCycle }); + const inMemoryCacheDisablePeriod = 5 * 60 * 1000; if (options.useInMemory && renewedAgo + inMemoryCacheDisablePeriod <= renewalThreshold * 1000) { - this.memoryCache.set(redisKey, parsedResult); + this.memoryCache.set(redisKey, cachedResult); } - return parsedResult.result; + return cachedResult.result; } else { this.logger('Missing cache for', { cacheKey, requestId: options.requestId, spanId, primaryQuery, renewCycle }); return fetchNew(); From 743b0302c59c7254c3e891c6cd3df24274c9f162 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Wed, 30 Jul 2025 17:41:33 +0200 Subject: [PATCH 3/4] chore: enable in memory cache for every --- .../src/orchestrator/QueryCache.ts | 39 ++++++++----------- 1 file changed, 17 insertions(+), 22 deletions(-) diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts index 9d792dd9d920a..4983aea1d3351 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts @@ -135,8 +135,9 @@ type CacheEntry = { type CheckCacheOptions = { renewalKey: string; renewalThreshold: number; + requestId: string; expiration: number; - options: CacheQueryResultOptions; + useInMemory: boolean; spanId: string; cacheKey: CacheKey; primaryQuery: boolean; @@ -872,25 +873,28 @@ export class QueryCache { redisKey: string, opts: CheckCacheOptions ): Promise { - // First check in-memory cache if enabled - if (opts.options.useInMemory) { + if (opts.useInMemory) { const inMemoryResult = this.checkInMemoryCache(redisKey, opts); - if (inMemoryResult) { return inMemoryResult; } } - // If not found in memory, check cache driver - return this.cacheDriver.get(redisKey); + const cachedResult = await this.cacheDriver.get(redisKey); + + if (opts.useInMemory) { + this.memoryCache.set(redisKey, cachedResult, { + ttl: opts.renewalThreshold + }); + } + + return cachedResult; } protected checkInMemoryCache( redisKey: string, opts: CheckCacheOptions ): any { - const inMemoryCacheDisablePeriod = 5 * 60 * 1000; - const inMemoryValue = this.memoryCache.get(redisKey); if (!inMemoryValue) { return null; @@ -901,12 +905,9 @@ export class QueryCache { if ( opts.renewalKey && ( !opts.renewalThreshold || - !inMemoryValue.time || - // Do not cache in memory in last 5 minutes of expiry. - // Most likely it'll cause race condition of refreshing data with different refreshKey values. - renewedAgo + inMemoryCacheDisablePeriod > opts.renewalThreshold * 1000 || + renewedAgo > opts.renewalThreshold * 1000 || inMemoryValue.renewalKey !== opts.renewalKey - ) || renewedAgo > opts.expiration * 1000 || renewedAgo > inMemoryCacheDisablePeriod + ) || renewedAgo > opts.expiration * 1000 ) { this.memoryCache.delete(redisKey); return null; @@ -919,7 +920,7 @@ export class QueryCache { renewalKey: inMemoryValue.renewalKey, newRenewalKey: opts.renewalKey, renewalThreshold: opts.renewalThreshold, - requestId: opts.options.requestId, + requestId: opts.requestId, spanId: opts.spanId, primaryQuery: opts.primaryQuery, renewCycle: opts.renewCycle @@ -1002,17 +1003,17 @@ export class QueryCache { const cachedResult = await this.checkInCache( redisKey, { + requestId: options.requestId, + useInMemory: options.useInMemory, renewalKey, renewalThreshold, expiration, - options, spanId, cacheKey, primaryQuery, renewCycle } ); - if (cachedResult) { const renewedAgo = (new Date()).getTime() - cachedResult.time; this.logger('Found cache entry', { @@ -1049,12 +1050,6 @@ export class QueryCache { } this.logger('Using cache for', { cacheKey, requestId: options.requestId, spanId, primaryQuery, renewCycle }); - - const inMemoryCacheDisablePeriod = 5 * 60 * 1000; - if (options.useInMemory && renewedAgo + inMemoryCacheDisablePeriod <= renewalThreshold * 1000) { - this.memoryCache.set(redisKey, cachedResult); - } - return cachedResult.result; } else { this.logger('Missing cache for', { cacheKey, requestId: options.requestId, spanId, primaryQuery, renewCycle }); From bdc93902a787eb729a5c2c82887aee71b37c253c Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Wed, 30 Jul 2025 19:04:11 +0200 Subject: [PATCH 4/4] chore: fix --- .../cubejs-query-orchestrator/src/orchestrator/QueryCache.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts index 4983aea1d3351..011f2e82e02a5 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts @@ -884,7 +884,7 @@ export class QueryCache { if (opts.useInMemory) { this.memoryCache.set(redisKey, cachedResult, { - ttl: opts.renewalThreshold + ttl: opts.renewalThreshold * 1000 }); }