Skip to content

Commit 60b94df

Browse files
authored
[CT-708] Indexer track e2e latency (#1237)
* fwd through message times * use the var i made * post processing stat emission * post-forwarding timestamp * pass through event type from vulcan * event type to stat emissions * test fix function calls * WIP WIP WIP * fix tests * unused import * test that kafka messages are threaded
1 parent 56b7cce commit 60b94df

File tree

16 files changed

+234
-58
lines changed

16 files changed

+234
-58
lines changed

indexer/packages/v4-protos/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,4 @@ export * from './codegen/google/protobuf/timestamp';
1616
export * from './codegen/dydxprotocol/indexer/protocol/v1/clob';
1717
export * from './codegen/dydxprotocol/indexer/protocol/v1/subaccount';
1818
export * from './codegen/dydxprotocol/indexer/shared/removal_reason';
19+
export * from './utils';
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
import { Timestamp } from './codegen/google/protobuf/timestamp';
2+
3+
export const MILLIS_IN_NANOS: number = 1_000_000;
4+
export const SECONDS_IN_MILLIS: number = 1_000;
5+
export function protoTimestampToDate(
6+
protoTime: Timestamp,
7+
): Date {
8+
const timeInMillis: number = Number(protoTime.seconds) * SECONDS_IN_MILLIS +
9+
Math.floor(protoTime.nanos / MILLIS_IN_NANOS);
10+
11+
return new Date(timeInMillis);
12+
}

indexer/services/ender/__tests__/helpers/indexer-proto-helpers.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ import {
5050
IndexerOrderId,
5151
PerpetualMarketCreateEventV1,
5252
DeleveragingEventV1,
53+
protoTimestampToDate,
5354
} from '@dydxprotocol-indexer/v4-protos';
5455
import {
5556
PerpetualMarketType,
@@ -63,7 +64,6 @@ import {
6364
generatePerpetualMarketMessage,
6465
generatePerpetualPositionsContents,
6566
} from '../../src/helpers/kafka-helper';
66-
import { protoTimestampToDate } from '../../src/lib/helper';
6767
import { DydxIndexerSubtypes, VulcanMessage } from '../../src/lib/types';
6868

6969
// TX Hash is SHA256, so is of length 64 hexadecimal without the '0x'.

indexer/services/ender/src/lib/helper.ts

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import {
88
import {
99
IndexerTendermintEvent,
1010
IndexerTendermintEvent_BlockEvent,
11-
Timestamp,
1211
OrderFillEventV1,
1312
MarketEventV1,
1413
SubaccountUpdateEventV1,
@@ -29,10 +28,6 @@ import Big from 'big.js';
2928
import _ from 'lodash';
3029
import { DateTime } from 'luxon';
3130

32-
import {
33-
MILLIS_IN_NANOS,
34-
SECONDS_IN_MILLIS,
35-
} from '../constants';
3631
import {
3732
AnnotatedSubaccountMessage,
3833
DydxIndexerSubtypes,
@@ -70,15 +65,6 @@ export function convertToSubaccountMessage(
7065
return subaccountMessage;
7166
}
7267

73-
export function protoTimestampToDate(
74-
protoTime: Timestamp,
75-
): Date {
76-
const timeInMillis: number = Number(protoTime.seconds) * SECONDS_IN_MILLIS +
77-
Math.floor(protoTime.nanos / MILLIS_IN_NANOS);
78-
79-
return new Date(timeInMillis);
80-
}
81-
8268
export function dateToDateTime(
8369
protoTime: Date,
8470
): DateTime {

indexer/services/socks/src/lib/message-forwarder.ts

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import {
33
logger,
44
InfoObject,
55
safeJsonStringify,
6+
STATS_NO_SAMPLING,
67
} from '@dydxprotocol-indexer/base';
78
import { updateOnMessageFunction } from '@dydxprotocol-indexer/kafka';
89
import { KafkaMessage } from 'kafkajs';
@@ -85,9 +86,10 @@ export class MessageForwarder {
8586
}
8687

8788
public onMessage(topic: string, message: KafkaMessage): void {
89+
const start: number = Date.now();
8890
stats.timing(
8991
`${config.SERVICE_NAME}.message_time_in_queue`,
90-
Date.now() - Number(message.timestamp),
92+
start - Number(message.timestamp),
9193
config.MESSAGE_FORWARDER_STATSD_SAMPLE_RATE,
9294
{
9395
topic,
@@ -125,18 +127,31 @@ export class MessageForwarder {
125127
return;
126128
}
127129

128-
const start: number = Date.now();
130+
const startForwardMessage: number = Date.now();
129131
this.forwardMessage(messageToForward);
130132
const end: number = Date.now();
131133
stats.timing(
132134
`${config.SERVICE_NAME}.forward_message`,
133-
end - start,
135+
end - startForwardMessage,
134136
config.MESSAGE_FORWARDER_STATSD_SAMPLE_RATE,
135137
{
136138
topic,
137139
channel: String(channel),
138140
},
139141
);
142+
143+
const originalMessageTimestamp = message.headers?.message_received_timestamp;
144+
if (originalMessageTimestamp !== undefined) {
145+
stats.timing(
146+
`${config.SERVICE_NAME}.message_time_since_received`,
147+
startForwardMessage - Number(originalMessageTimestamp),
148+
STATS_NO_SAMPLING,
149+
{
150+
topic,
151+
event_type: String(message.headers?.event_type),
152+
},
153+
);
154+
}
140155
}
141156

142157
public forwardMessage(message: MessageToForward): void {

indexer/services/vulcan/__tests__/handlers/order-place-handler.test.ts

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ import { expectCanceledOrderStatus, expectOpenOrderIds, handleInitialOrderPlace
6262
import { expectOffchainUpdateMessage, expectWebsocketOrderbookMessage, expectWebsocketSubaccountMessage } from '../helpers/websocket-helpers';
6363
import { OrderbookSide } from '../../src/lib/types';
6464
import { getOrderIdHash, isLongTermOrder, isStatefulOrder } from '@dydxprotocol-indexer/v4-proto-parser';
65+
import { defaultKafkaHeaders } from '../helpers/constants';
6566
import config from '../../src/config';
6667

6768
jest.mock('@dydxprotocol-indexer/base', () => ({
@@ -196,6 +197,12 @@ describe('order-place-handler', () => {
196197
const replacementMessageIoc: KafkaMessage = createKafkaMessage(
197198
Buffer.from(Uint8Array.from(OffChainUpdateV1.encode(replacementUpdateIoc).finish())),
198199
);
200+
[replacementMessage, replacementMessageGoodTilBlockTime, replacementMessageConditional,
201+
replacementMessageFok, replacementMessageIoc].forEach((message) => {
202+
// eslint-disable-next-line no-param-reassign
203+
message.headers = defaultKafkaHeaders;
204+
});
205+
199206
const dbDefaultOrder: OrderFromDatabase = {
200207
...testConstants.defaultOrder,
201208
id: testConstants.defaultOrderId,
@@ -1225,7 +1232,11 @@ function expectWebsocketMessagesSent(
12251232
version: SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION,
12261233
});
12271234

1228-
expectWebsocketSubaccountMessage(producerSendSpy.mock.calls[callIndex][0], subaccountMessage);
1235+
expectWebsocketSubaccountMessage(
1236+
producerSendSpy.mock.calls[callIndex][0],
1237+
subaccountMessage,
1238+
defaultKafkaHeaders,
1239+
);
12291240
callIndex += 1;
12301241
}
12311242

0 commit comments

Comments
 (0)