Skip to content

Commit aa000df

Browse files
vincentwschaumergify[bot]
authored andcommitted
Improve query to find candles map. (#2650)
(cherry picked from commit 196dc84) # Conflicts: # indexer/services/ender/__tests__/lib/candles-generator.test.ts
1 parent 6ed24c2 commit aa000df

File tree

4 files changed

+91
-45
lines changed

4 files changed

+91
-45
lines changed

indexer/packages/postgres/src/stores/candle-table.ts

Lines changed: 58 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
import _ from 'lodash';
21
import { PartialModelObject, QueryBuilder } from 'objection';
32

43
import { BUFFER_ENCODING_UTF_8, DEFAULT_POSTGRES_OPTIONS } from '../constants';
4+
import { knexReadReplica } from '../helpers/knex';
55
import { setupBaseQuery, verifyAllRequiredFields } from '../helpers/stores-helpers';
66
import Transaction from '../helpers/transaction';
77
import { getUuid } from '../helpers/uuid';
@@ -174,36 +174,66 @@ export async function findLatest(
174174

175175
export async function findCandlesMap(
176176
tickers: string[],
177-
resolutions: CandleResolution[],
178-
options: Options = DEFAULT_POSTGRES_OPTIONS,
179177
): Promise<CandlesMap> {
178+
if (tickers.length === 0) {
179+
return {};
180+
}
181+
180182
const candlesMap: CandlesMap = {};
183+
for (const ticker of tickers) {
184+
candlesMap[ticker] = {};
185+
}
181186

182-
await Promise.all(
183-
_.map(
184-
tickers,
185-
async (ticker: string) => {
186-
candlesMap[ticker] = {};
187-
const findLatestCandles: Promise<CandleFromDatabase | undefined>[] = resolutions.map(
188-
(resolution: CandleResolution) => findLatest(
189-
ticker,
190-
resolution,
191-
options,
192-
),
193-
);
194-
195-
// Map each resolution to its respective candle
196-
const allLatestCandles: (CandleFromDatabase | undefined)[] = await Promise.all(
197-
findLatestCandles,
198-
);
199-
_.forEach(allLatestCandles, (candle: CandleFromDatabase | undefined) => {
200-
if (candle !== undefined) {
201-
candlesMap[ticker][candle.resolution] = candle;
202-
}
203-
});
204-
},
205-
),
206-
);
187+
const minuteCandlesResult: {
188+
rows: CandleFromDatabase[],
189+
} = await knexReadReplica.getConnection().raw(
190+
`
191+
SELECT DISTINCT ON (
192+
ticker,
193+
resolution
194+
) candles.* FROM
195+
candles
196+
WHERE
197+
"ticker" IN (${tickers.map((ticker) => { return `'${ticker}'`; }).join(',')}) AND
198+
"startedAt" > NOW() - INTERVAL '3 hours' AND
199+
resolution IN ('1MIN', '5MINS', '15MINS', '30MINS', '1HOUR')
200+
ORDER BY
201+
ticker,
202+
resolution,
203+
"startedAt" DESC;
204+
`,
205+
) as unknown as {
206+
rows: CandleFromDatabase[],
207+
};
208+
const hourDayCandlesResult: {
209+
rows: CandleFromDatabase[],
210+
} = await knexReadReplica.getConnection().raw(
211+
`
212+
SELECT DISTINCT ON (
213+
ticker,
214+
resolution
215+
) candles.* FROM
216+
candles
217+
WHERE
218+
"ticker" IN (${tickers.map((ticker) => { return `'${ticker}'`; }).join(',')}) AND
219+
"startedAt" > NOW() - INTERVAL '2 days' AND
220+
resolution IN ('4HOURS', '1DAY')
221+
ORDER BY
222+
ticker,
223+
resolution,
224+
"startedAt" DESC;
225+
`,
226+
) as unknown as {
227+
rows: CandleFromDatabase[],
228+
};
229+
const latestCandles: CandleFromDatabase[] = minuteCandlesResult.rows
230+
.concat(hourDayCandlesResult.rows);
231+
for (const candle of latestCandles) {
232+
if (candlesMap[candle.ticker] === undefined) {
233+
candlesMap[candle.ticker] = {};
234+
}
235+
candlesMap[candle.ticker][candle.resolution] = candle;
236+
}
207237

208238
return candlesMap;
209239
}

indexer/services/ender/__tests__/lib/candles-generator.test.ts

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,14 @@ import {
3737
redis,
3838
} from '@dydxprotocol-indexer/redis';
3939
import { ORDERBOOK_MID_PRICES_CACHE_KEY_PREFIX } from '@dydxprotocol-indexer/redis/build/src/caches/orderbook-mid-prices-cache';
40+
import { DateTime, Settings } from 'luxon';
4041

4142
describe('candleHelper', () => {
43+
const startedAt: DateTime = helpers.calculateNormalizedCandleStartTime(
44+
testConstants.createdDateTime,
45+
CandleResolution.ONE_MINUTE,
46+
);
47+
4248
beforeAll(async () => {
4349
await dbHelpers.migrate();
4450
await dbHelpers.clearData();
@@ -48,13 +54,15 @@ describe('candleHelper', () => {
4854
beforeEach(async () => {
4955
await testMocks.seedData();
5056
await perpetualMarketRefresher.updatePerpetualMarkets();
57+
Settings.now = () => startedAt.plus({ minutes: 30 }).valueOf();
5158
});
5259

5360
afterEach(async () => {
5461
await dbHelpers.clearData();
5562
clearCandlesMap();
5663
jest.clearAllMocks();
5764
await redis.deleteAllAsync(redisClient);
65+
Settings.now = () => new Date().valueOf();
5866
});
5967

6068
afterAll(async () => {
@@ -87,10 +95,6 @@ describe('candleHelper', () => {
8795
orderbookMidPriceClose: undefined,
8896
orderbookMidPriceOpen: undefined,
8997
};
90-
const startedAt: IsoString = helpers.calculateNormalizedCandleStartTime(
91-
testConstants.createdDateTime,
92-
CandleResolution.ONE_MINUTE,
93-
).toISO();
9498
const previousStartedAt: IsoString = helpers.calculateNormalizedCandleStartTime(
9599
testConstants.createdDateTime.minus({ minutes: 1 }),
96100
CandleResolution.ONE_MINUTE,
@@ -304,8 +308,8 @@ describe('candleHelper', () => {
304308
'100', // open interest
305309
false, // block contains trades
306310
{ // expected candle
307-
id: CandleTable.uuid(startedAt, defaultCandle.ticker, CandleResolution.ONE_MINUTE),
308-
startedAt,
311+
id: CandleTable.uuid(startedAt.toISO(), defaultCandle.ticker, CandleResolution.ONE_MINUTE),
312+
startedAt: startedAt.toISO(),
309313
ticker: testConstants.defaultPerpetualMarket.ticker,
310314
resolution: CandleResolution.ONE_MINUTE,
311315
low: closePrice,
@@ -343,8 +347,8 @@ describe('candleHelper', () => {
343347
true, // block contains trades
344348
{ // expected candle
345349
...defaultCandle,
346-
id: CandleTable.uuid(startedAt, defaultCandle.ticker, CandleResolution.ONE_MINUTE),
347-
startedAt,
350+
id: CandleTable.uuid(startedAt.toISO(), defaultCandle.ticker, CandleResolution.ONE_MINUTE),
351+
startedAt: startedAt.toISO(),
348352
resolution: CandleResolution.ONE_MINUTE,
349353
startingOpenInterest: '100',
350354
orderbookMidPriceClose: '1000',
@@ -356,7 +360,7 @@ describe('candleHelper', () => {
356360
[
357361
'updates empty candle', // description
358362
{ // initial candle
359-
startedAt,
363+
startedAt: startedAt.toISO(),
360364
ticker: testConstants.defaultPerpetualMarket.ticker,
361365
resolution: CandleResolution.ONE_MINUTE,
362366
low: closePrice,
@@ -374,8 +378,8 @@ describe('candleHelper', () => {
374378
true, // block contains trades
375379
{ // expected candle
376380
...defaultCandle,
377-
id: CandleTable.uuid(startedAt, defaultCandle.ticker, CandleResolution.ONE_MINUTE),
378-
startedAt,
381+
id: CandleTable.uuid(startedAt.toISO(), defaultCandle.ticker, CandleResolution.ONE_MINUTE),
382+
startedAt: startedAt.toISO(),
379383
resolution: CandleResolution.ONE_MINUTE,
380384
startingOpenInterest: existingStartingOpenInterest,
381385
orderbookMidPriceClose: null,
@@ -396,7 +400,7 @@ describe('candleHelper', () => {
396400
[
397401
'does not update candle when there are no trades and an existing candle', // description
398402
{ // initial candle
399-
startedAt,
403+
startedAt: startedAt.toISO(),
400404
ticker: testConstants.defaultPerpetualMarket.ticker,
401405
resolution: CandleResolution.ONE_MINUTE,
402406
low: lowPrice,
@@ -413,8 +417,8 @@ describe('candleHelper', () => {
413417
'100', // open interest
414418
false, // block contains trades
415419
{ // expected candle
416-
id: CandleTable.uuid(startedAt, defaultCandle.ticker, CandleResolution.ONE_MINUTE),
417-
startedAt,
420+
id: CandleTable.uuid(startedAt.toISO(), defaultCandle.ticker, CandleResolution.ONE_MINUTE),
421+
startedAt: startedAt.toISO(),
418422
ticker: testConstants.defaultPerpetualMarket.ticker,
419423
resolution: CandleResolution.ONE_MINUTE,
420424
low: lowPrice,
@@ -463,7 +467,7 @@ describe('candleHelper', () => {
463467

464468
if (expectedCandle === undefined) {
465469
// Verify no candles in postgres and no kafka messages
466-
await verifyNoCandleInPostgres(CandleResolution.ONE_MINUTE, startedAt);
470+
await verifyNoCandleInPostgres(CandleResolution.ONE_MINUTE, startedAt.toISO());
467471
verifyNoCandlesKafkaMessages(publisher, CandleResolution.ONE_MINUTE);
468472
} else {
469473
const expectedCandles: CandleFromDatabase[] = [expectedCandle];
@@ -485,6 +489,15 @@ describe('candleHelper', () => {
485489
const usdVolume: string = Big(existingPrice).times(baseTokenVolume).toString();
486490
const orderbookMidPriceClose = '7500';
487491
const orderbookMidPriceOpen = '8000';
492+
<<<<<<< HEAD
493+
=======
494+
// Set candle start time to be far in the past to ensure all candles are new
495+
const startTime: IsoString = helpers.calculateNormalizedCandleStartTime(
496+
testConstants.createdDateTime.minus({ minutes: 100 }),
497+
CandleResolution.ONE_MINUTE,
498+
).toUTC().toISO();
499+
500+
>>>>>>> 196dc84f (Improve query to find candles map. (#2650))
488501
await Promise.all(
489502
_.map(Object.values(CandleResolution), (resolution: CandleResolution) => {
490503
return CandleTable.create({

indexer/services/ender/src/caches/block-cache.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ function isNextBlock(blockHeight: string): boolean {
107107
* All caches must be initialized in a Transaction to ensure consistency
108108
*/
109109
export async function initializeAllCaches(): Promise<void> {
110+
const start: number = Date.now();
110111
const txId: number = await Transaction.start();
111112
await Transaction.setIsolationLevel(txId, IsolationLevel.READ_COMMITTED);
112113

@@ -120,6 +121,10 @@ export async function initializeAllCaches(): Promise<void> {
120121
]);
121122

122123
await Transaction.rollback(txId);
124+
stats.timing(
125+
`${config.SERVICE_NAME}.initialize_caches`,
126+
Date.now() - start,
127+
);
123128
}
124129

125130
export function resetBlockCache(): void {

indexer/services/ender/src/caches/candle-cache.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@ export async function startCandleCache(txId?: number): Promise<void> {
2323

2424
candlesMap = await CandleTable.findCandlesMap(
2525
tickers,
26-
Object.values(CandleResolution),
27-
{ txId },
2826
);
2927
}
3028

0 commit comments

Comments
 (0)