Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CT-839] Add blockHeight to subaccount websocket message #1585

Merged
merged 6 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions indexer/packages/kafka/src/websocket-helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ export function generateSubaccountMessageContents(
order: OrderFromDatabase | undefined,
perpetualMarket: PerpetualMarketFromDatabase,
placementStatus: OrderPlaceV1_OrderPlacementStatus,
blockHeight: string | undefined,
): SubaccountMessageContents {
const orderTIF: TimeInForce = protocolTranslations.protocolOrderTIFToTIF(
redisOrder.order!.timeInForce,
Expand Down Expand Up @@ -89,6 +90,7 @@ export function generateSubaccountMessageContents(
triggerPrice: getTriggerPrice(redisOrder.order!, perpetualMarket),
},
],
...(blockHeight && { blockHeight }),
};
return contents;
}
Expand All @@ -98,12 +100,14 @@ export function createSubaccountWebsocketMessage(
order: OrderFromDatabase | undefined,
perpetualMarket: PerpetualMarketFromDatabase,
placementStatus: OrderPlaceV1_OrderPlacementStatus,
blockHeight: string | undefined,
): Buffer {
const contents: SubaccountMessageContents = generateSubaccountMessageContents(
redisOrder,
order,
perpetualMarket,
placementStatus,
blockHeight,
);

const subaccountMessage: SubaccountMessage = SubaccountMessage.fromPartial({
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import { clearData, migrate, teardown } from '../../src/helpers/db-helpers';
import { clear, getLatestBlockHeight, updateBlockHeight } from '../../src/loops/block-height-refresher';
import { defaultBlock2 } from '../helpers/constants';
import { seedData } from '../helpers/mock-generators';
import config from '../../src/config';

describe('blockHeightRefresher', () => {
beforeAll(async () => {
await migrate();
await seedData();
await updateBlockHeight();
});

afterAll(async () => {
await clearData();
await teardown();
});

describe('getLatestBlockHeight', () => {
it('successfully gets the latest block height after update', async () => {
await updateBlockHeight();
expect(getLatestBlockHeight()).toBe(defaultBlock2.blockHeight);
});
});

describe('clear', () => {
it('throws an error if block height does not exist', () => {
clear();
expect(() => getLatestBlockHeight()).toThrowError('Unable to find latest block height');
});

it('throws an error when clear is called in non-test environment', () => {
const originalEnv = config.NODE_ENV;
config.NODE_ENV = 'production';
expect(() => clear()).toThrowError('clear cannot be used in non-test env');
config.NODE_ENV = originalEnv;
});
});
});
1 change: 1 addition & 0 deletions indexer/packages/postgres/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ export const postgresConfigSchema = {
ASSET_REFRESHER_INTERVAL_MS: parseInteger({ default: 30_000 }), // 30 seconds
MARKET_REFRESHER_INTERVAL_MS: parseInteger({ default: 30_000 }), // 30 seconds
LIQUIDITY_TIER_REFRESHER_INTERVAL_MS: parseInteger({ default: 30_000 }), // 30 seconds
BLOCK_HEIGHT_REFRESHER_INTERVAL_MS: parseInteger({ default: 1_000 }), // 1 second
USE_READ_REPLICA: parseBoolean({ default: false }),

// Optional environment variables.
Expand Down
1 change: 1 addition & 0 deletions indexer/packages/postgres/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ export * as TradingRewardAggregationTable from './stores/trading-reward-aggregat

export * as perpetualMarketRefresher from './loops/perpetual-market-refresher';
export * as assetRefresher from './loops/asset-refresher';
export * as blockHeightRefresher from './loops/block-height-refresher';
export * as liquidityTierRefresher from './loops/liquidity-tier-refresher';

export * as uuid from './helpers/uuid';
Expand Down
61 changes: 61 additions & 0 deletions indexer/packages/postgres/src/loops/block-height-refresher.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import {
stats,
logger,
NodeEnv,
} from '@dydxprotocol-indexer/base';

import config from '../config';
import * as BlockTable from '../stores/block-table';
import { BlockFromDatabase, Options } from '../types';
Comment on lines +8 to +9
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using type-only imports for BlockTable and BlockFromDatabase as they are only used as types.

import type { BlockFromDatabase, Options } from '../types';
import type * as BlockTable from '../stores/block-table';

import { startUpdateLoop } from './loopHelper';

let latestBlockHeight: string = '';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove unnecessary type annotation for latestBlockHeight.

- let latestBlockHeight: string = '';
+ let latestBlockHeight = '';

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
let latestBlockHeight: string = '';
let latestBlockHeight = '';


/**
* Refresh loop to cache the latest block height from the database in-memory.
*/
export async function start(): Promise<void> {
await startUpdateLoop(
updateBlockHeight,
config.BLOCK_HEIGHT_REFRESHER_INTERVAL_MS,
'updateBlockHeight',
);
}

/**
* Updates in-memory latest block height.
*/
export async function updateBlockHeight(options?: Options): Promise<void> {
const startTime: number = Date.now();
try {
const latestBlock: BlockFromDatabase = await BlockTable.getLatest(
options || { readReplica: true },
);
latestBlockHeight = latestBlock.blockHeight;
stats.timing(`${config.SERVICE_NAME}.loops.update_block_height`, Date.now() - startTime);
// eslint-disable-next-line no-empty
} catch (error) { }
chenyaoy marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Gets the latest block height.
*/
export function getLatestBlockHeight(): string {
if (!latestBlockHeight) {
const message: string = 'Unable to find latest block height';
logger.error({
at: 'block-height-refresher#getLatestBlockHeight',
message,
});
throw new Error(message);
}
return latestBlockHeight;
Comment on lines +43 to +52
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tip

Codebase Verification

The extracted context from the handler files shows that getLatestBlockHeight is used directly without any apparent error handling mechanisms around it. This indicates that if getLatestBlockHeight throws an error, it might not be caught and handled properly in these contexts.

  • Files to review for error handling:
    • indexer/services/vulcan/src/handlers/order-remove-handler.ts
    • indexer/services/vulcan/src/handlers/order-place-handler.ts

Ensure that calls to getLatestBlockHeight are wrapped in try-catch blocks or have appropriate error handling mechanisms to prevent unhandled exceptions.

Analysis chain

Ensure robust error handling in the getLatestBlockHeight function.

Scripts Executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify robust error handling in the `getLatestBlockHeight` function.

# Test: Search for the function usage. Expect: Robust error handling.
rg --type typescript $'getLatestBlockHeight'

Length of output: 78



Script:

#!/bin/bash
# Description: Verify robust error handling in the `getLatestBlockHeight` function.

# Test: Search for the function usage. Expect: Robust error handling.
rg --type ts 'getLatestBlockHeight'

Length of output: 3090



Script:

#!/bin/bash
# Description: Extract and inspect the usage of `getLatestBlockHeight` in handler files to verify robust error handling.

# Extract lines around the usage of `getLatestBlockHeight` in handler files.
rg --type ts --context 5 'getLatestBlockHeight' indexer/services/vulcan/src/handlers/order-remove-handler.ts indexer/services/vulcan/src/handlers/order-place-handler.ts

Length of output: 4264

}

export function clear(): void {
if (config.NODE_ENV !== NodeEnv.TEST) {
throw new Error('clear cannot be used in non-test env');
}
Comment on lines +56 to +58
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Throwing errors in non-test environments could be risky. Consider using a safer approach to handle this condition.

- throw new Error('clear cannot be used in non-test env');
+ logger.error('clear cannot be used in non-test env');
+ return;

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
if (config.NODE_ENV !== NodeEnv.TEST) {
throw new Error('clear cannot be used in non-test env');
}
if (config.NODE_ENV !== NodeEnv.TEST) {
logger.error('clear cannot be used in non-test env');
return;
}


latestBlockHeight = '';
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ export interface SubaccountMessageContents {
fills?: FillSubaccountMessageContents[],
transfers?: TransferSubaccountMessageContents,
tradingReward?: TradingRewardSubaccountMessageContents,
blockHeight?: string,
}

export interface PerpetualPositionSubaccountMessageContents {
Expand Down
7 changes: 5 additions & 2 deletions indexer/services/comlink/public/websocket-documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ This channel provides realtime information about orders, fills, transfers, perpe

### Initial Response

Returns everything from the `/v4/addresses/:address/subaccountNumber/:subaccountNumber`, and `/v4/orders?addresses=${address}&subaccountNumber=${subaccountNumber}&status=OPEN`.
Returns everything from the `/v4/addresses/:address/subaccountNumber/:subaccountNumber`, and `/v4/orders?addresses=${address}&subaccountNumber=${subaccountNumber}&status=OPEN` and the latest block height.

### Example
```tsx
Expand All @@ -84,7 +84,8 @@ Returns everything from the `/v4/addresses/:address/subaccountNumber/:subaccount
},
"marginEnabled": true
},
"orders": []
"orders": [],
"blockHeight": "5"
}
}
```
Expand Down Expand Up @@ -117,6 +118,8 @@ export interface SubaccountMessageContents {
fills?: FillSubaccountMessageContents[],
// Transfers that occur on the subaccount
transfers?: TransferSubaccountMessageContents,
// Latest block processed by Indexer
blockHeight?: string,
}

export interface PerpetualPositionSubaccountMessageContents {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,7 @@ async function expectUpdatedPositionsSubaccountKafkaMessage(
_.keyBy(perpMarkets, PerpetualMarketColumns.id),
assetPositions,
_.keyBy(assets, AssetColumns.id),
blockHeight,
);

expectSubaccountKafkaMessage({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,7 @@ function expectTransfersSubaccountKafkaMessage(
event.sender!.subaccountId!,
event.sender!.subaccountId!,
event.recipient!.subaccountId,
blockHeight,
);
}

Expand All @@ -661,6 +662,7 @@ function expectTransfersSubaccountKafkaMessage(
event.recipient!.subaccountId!,
event.sender!.subaccountId,
event.recipient!.subaccountId!,
blockHeight,
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,7 @@ export async function expectFillSubaccountKafkaMessageFromLiquidationEvent(
[convertPerpetualPosition(position!)],
perpetualMarketRefresher.getPerpetualMarketsMap(),
),
blockHeight,
};

expectSubaccountKafkaMessage({
Expand Down Expand Up @@ -739,6 +740,7 @@ export function expectOrderSubaccountKafkaMessage(
orders: [
orderObject,
],
blockHeight,
};

expectSubaccountKafkaMessage({
Expand Down Expand Up @@ -798,6 +800,7 @@ export async function expectOrderFillAndPositionSubaccountKafkaMessageFromIds(
fills: [
generateFillSubaccountMessage(fill!, perpetualMarket!.ticker),
],
blockHeight,
};

if (position !== undefined) {
Expand Down
14 changes: 14 additions & 0 deletions indexer/services/ender/__tests__/helpers/kafka-helper.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ import { updateBlockCache } from '../../src/caches/block-cache';
import { defaultPreviousHeight, defaultWalletAddress } from './constants';

describe('kafka-helper', () => {
const blockHeight: string = '5';

describe('addPositionsToContents', () => {
const defaultPerpetualPosition: PerpetualPositionFromDatabase = {
id: '',
Expand Down Expand Up @@ -81,10 +83,12 @@ describe('kafka-helper', () => {
{},
[],
{},
blockHeight,
);

expect(contents.perpetualPositions).toEqual(undefined);
expect(contents.assetPositions).toEqual(undefined);
expect(contents.blockHeight).toEqual(blockHeight);
});

it('successfully adds one asset position and one perp position', () => {
Expand All @@ -100,6 +104,7 @@ describe('kafka-helper', () => {
{ [defaultPerpetualMarket.id]: defaultPerpetualMarket },
[defaultAssetPosition],
{ [defaultAsset.id]: defaultAsset },
blockHeight,
);

expect(contents.perpetualPositions!.length).toEqual(1);
Expand Down Expand Up @@ -129,6 +134,7 @@ describe('kafka-helper', () => {
side: 'LONG',
size: defaultAssetPosition.size,
});
expect(contents.blockHeight).toEqual(blockHeight);
});

it('successfully adds one asset position', () => {
Expand All @@ -144,6 +150,7 @@ describe('kafka-helper', () => {
{},
[defaultAssetPosition],
{ [defaultAsset.id]: defaultAsset },
blockHeight,
);

expect(contents.perpetualPositions).toBeUndefined();
Expand All @@ -158,6 +165,7 @@ describe('kafka-helper', () => {
side: 'LONG',
size: defaultAssetPosition.size,
});
expect(contents.blockHeight).toEqual(blockHeight);
});

it('successfully adds one perp position', () => {
Expand All @@ -173,6 +181,7 @@ describe('kafka-helper', () => {
{ [defaultPerpetualMarket.id]: defaultPerpetualMarket },
[],
{},
blockHeight,
);

expect(contents.perpetualPositions!.length).toEqual(1);
Expand All @@ -193,6 +202,7 @@ describe('kafka-helper', () => {
});

expect(contents.assetPositions).toBeUndefined();
expect(contents.blockHeight).toEqual(blockHeight);
});

it('successfully adds multiple positions', () => {
Expand Down Expand Up @@ -222,6 +232,7 @@ describe('kafka-helper', () => {
},
],
{ [defaultAsset.id]: defaultAsset },
blockHeight,
);

// check perpetual positions
Expand Down Expand Up @@ -277,6 +288,7 @@ describe('kafka-helper', () => {
side: 'LONG',
size: assetSize,
});
expect(contents.blockHeight).toEqual(blockHeight);
});
});

Expand Down Expand Up @@ -343,6 +355,7 @@ describe('kafka-helper', () => {
senderSubaccountId,
senderSubaccountId,
recipientSubaccountId,
transfer.createdAtHeight,
);

expect(contents.transfers).toEqual({
Expand All @@ -361,6 +374,7 @@ describe('kafka-helper', () => {
createdAtHeight: transfer.createdAtHeight,
transactionHash: transfer.transactionHash,
});
expect(contents.blockHeight).toEqual(transfer.createdAtHeight);
});

it('successfully adds a transfer_in', () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ export abstract class AbstractOrderFillHandler<T> extends Handler<T> {
[position],
perpetualMarketRefresher.getPerpetualMarketsMap(),
),
blockHeight: this.block.height.toString(),
};
if (order !== undefined) {
message.orders = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ export class ConditionalOrderPlacementHandler extends
orders: [
generateOrderSubaccountMessage(conditionalOrder, perpetualMarket.ticker),
],
blockHeight: this.block.height.toString(),
};

return [
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { generateSubaccountMessageContents } from '@dydxprotocol-indexer/kafka';
import {
OrderFromDatabase, OrderModel,
OrderFromDatabase,
OrderModel,
OrderTable,
PerpetualMarketFromDatabase,
perpetualMarketRefresher,
Expand Down Expand Up @@ -98,6 +99,7 @@ export class StatefulOrderPlacementHandler
dbOrder,
perpetualMarket,
OrderPlaceV1_OrderPlacementStatus.ORDER_PLACEMENT_STATUS_OPENED,
this.block.height.toString(),
);

const subaccountIdProto: SubaccountId = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ export class SubaccountUpdateHandler extends Handler<SubaccountUpdate> {
perpetualMarketsMapping,
updatedAssetPositions,
assetsMap,
this.block.height.toString(),
);

return this.generateConsolidatedSubaccountKafkaEvent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ export class TradingRewardsHandler extends Handler<TradingRewardsEventV1> {

const subaccountMessageContents: SubaccountMessageContents = {
tradingReward: tradingRewardSubaccountMessageContents,
blockHeight: this.block.height.toString(),
};

kafkaEvents.push(
Expand Down
2 changes: 2 additions & 0 deletions indexer/services/ender/src/handlers/transfer-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ export class TransferHandler extends Handler<TransferEventV1> {
this.event.sender!.subaccountId!,
this.event.sender!.subaccountId,
this.event.recipient!.subaccountId,
this.block.height.toString(),
);

kafkaEvents.push(
Expand All @@ -74,6 +75,7 @@ export class TransferHandler extends Handler<TransferEventV1> {
this.event.recipient!.subaccountId!,
this.event.sender!.subaccountId,
this.event.recipient!.subaccountId,
this.block.height.toString(),
);

kafkaEvents.push(
Expand Down
Loading
Loading