Skip to content

Commit 901c174

Browse files
committed
rotor: count each multiplied event sent to destination as additional "active event"
1 parent 4d48edc commit 901c174

File tree

3 files changed

+6
-5
lines changed

3 files changed

+6
-5
lines changed

libs/core-functions/src/functions/lib/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ export type FuncChainResult = {
7777
};
7878

7979
export type FunctionExecRes = {
80-
receivedAt?: any;
80+
receivedAt?: Date;
8181
eventIndex: number;
8282
event?: any;
8383
metricsMeta?: MetricsMeta;

services/rotor/src/lib/functions-chain.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -288,10 +288,10 @@ export async function runChain(
288288
const event = events[i];
289289
let result: FuncReturn = undefined;
290290
const sw = stopwatch();
291-
const rat = new Date(event.receivedAt) as any;
291+
const rat = new Date(event.receivedAt);
292292
const execLogMeta = {
293293
eventIndex: i,
294-
receivedAt: rat && rat != "Invalid Date" ? rat : new Date(),
294+
receivedAt: !isNaN(rat.getTime()) ? rat : new Date(),
295295
functionId: f.id,
296296
metricsMeta: metricsMeta,
297297
};

services/rotor/src/lib/metrics.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,8 @@ export function createMetrics(
6969
value: JSON.stringify({
7070
timestamp: d,
7171
workspaceId: m.workspaceId,
72-
messageId: m.messageId,
72+
// to count active events use composed key: messageId_eventIndex_receivedAt
73+
messageId: m.key,
7374
}),
7475
};
7576
}),
@@ -183,7 +184,7 @@ export function createMetrics(
183184
return prefix + status;
184185
})(el);
185186
buffer.push({
186-
key: crypto.randomUUID(),
187+
key: el.metricsMeta.messageId + "_" + el.eventIndex + "_" + d.getTime(),
187188
timestamp: d,
188189
...omit(el.metricsMeta, "retries"),
189190
functionId: el.functionId,

0 commit comments

Comments
 (0)