Skip to content

Commit 5d1d287

Browse files
committed
rotor: added persistent store metrics
1 parent 44c133c commit 5d1d287

File tree

9 files changed

+97
-52
lines changed

9 files changed

+97
-52
lines changed

libs/core-functions/src/functions/lib/index.ts

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import {
1616
FuncReturn,
1717
FunctionLogger,
1818
JitsuFunction,
19-
Metrics,
19+
FunctionMetrics,
2020
TTLStore,
2121
} from "@jitsu/protocols/functions";
2222
import {
@@ -70,6 +70,31 @@ export type MetricsMeta = {
7070
retries?: number;
7171
};
7272

73+
export type FuncChainResult = {
74+
connectionId?: string;
75+
events: AnyEvent[];
76+
execLog: FunctionExecLog;
77+
};
78+
79+
export type FunctionExecRes = {
80+
receivedAt?: any;
81+
eventIndex: number;
82+
event?: any;
83+
metricsMeta?: MetricsMeta;
84+
functionId: string;
85+
error?: any;
86+
dropped?: boolean;
87+
ms: number;
88+
};
89+
90+
export type FunctionExecLog = FunctionExecRes[];
91+
92+
export interface RotorMetrics {
93+
logMetrics: (execLog: FunctionExecLog) => void;
94+
storeStatus: (namespace: string, operation: string, status: string) => void;
95+
close: () => void;
96+
}
97+
7398
export type FetchType = (
7499
url: string,
75100
opts?: FetchOpts,
@@ -92,7 +117,7 @@ export type FunctionChainContext = {
92117
fetch: InternalFetchType;
93118
store: TTLStore;
94119
anonymousEventsStore?: AnonymousEventsStore;
95-
metrics?: Metrics;
120+
metrics?: FunctionMetrics;
96121
connectionOptions?: any;
97122
};
98123

libs/core-functions/src/functions/lib/store.ts

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import parse from "parse-duration";
44
import { MongoClient, ReadPreference, Collection } from "mongodb";
55
import { RetryError } from "@jitsu/functions-lib";
66
import { getLog, Singleton } from "juava";
7+
import { RotorMetrics } from "./index";
78

89
export const defaultTTL = 60 * 60 * 24 * 31; // 31 days
910
export const maxAllowedTTL = 2147483647; // max allowed value for ttl in redis (68years)
@@ -69,7 +70,8 @@ export const createMongoStore = (
6970
namespace: string,
7071
mongo: Singleton<MongoClient>,
7172
useLocalCache: boolean,
72-
fast: boolean
73+
fast: boolean,
74+
metrics?: RotorMetrics
7375
): TTLStore => {
7476
const localCache: Record<string, StoreValue> = {};
7577
const readOptions = fast ? { readPreference: ReadPreference.NEAREST } : {};
@@ -113,22 +115,32 @@ export const createMongoStore = (
113115
}
114116
}
115117

116-
function storeErr(err: any, text: string) {
118+
function storeErr(operation: "get" | "set" | "del" | "ttl", err: any, text: string, metrics?: RotorMetrics) {
117119
log.atError().log(`${text}: ${err.message}`);
120+
if (metrics) {
121+
metrics.storeStatus(namespace, operation, "error");
122+
}
118123
if ((err.message ?? "").includes("timed out")) {
119124
return new RetryError(text + ": Timed out.");
120125
}
121126
return new RetryError(text + ": " + err.message);
122127
}
123128

129+
function success(operation: "get" | "set" | "del" | "ttl", metrics?: RotorMetrics) {
130+
if (metrics) {
131+
metrics.storeStatus(namespace, operation, "success");
132+
}
133+
}
134+
124135
return {
125136
get: async (key: string) => {
126137
try {
127138
const res =
128139
getFromLocalCache(key) || (await ensureCollection().then(c => c.findOne({ _id: key }, readOptions)));
140+
success("get", metrics);
129141
return res ? res.value : undefined;
130142
} catch (err: any) {
131-
throw storeErr(err, `Error getting key ${key} from mongo store ${namespace}`);
143+
throw storeErr("get", err, `Error getting key ${key} from mongo store ${namespace}`);
132144
}
133145
},
134146
getWithTTL: async (key: string) => {
@@ -139,9 +151,10 @@ export const createMongoStore = (
139151
return undefined;
140152
}
141153
const ttl = res.expireAt ? Math.max(Math.floor((res.expireAt.getTime() - new Date().getTime()) / 1000), 0) : -1;
154+
success("get", metrics);
142155
return { value: res.value, ttl };
143156
} catch (err: any) {
144-
throw storeErr(err, `Error getting key ${key} from mongo store ${namespace}`);
157+
throw storeErr("get", err, `Error getting key ${key} from mongo store ${namespace}`);
145158
}
146159
},
147160
set: async (key: string, obj: any, opts?: SetOpts) => {
@@ -165,9 +178,12 @@ export const createMongoStore = (
165178
if (useLocalCache) {
166179
localCache[key] = colObj;
167180
}
181+
})
182+
.then(() => {
183+
success("set", metrics);
168184
});
169185
} catch (err: any) {
170-
throw storeErr(err, `Error setting key ${key} in mongo store ${namespace}`);
186+
throw storeErr("set", err, `Error setting key ${key} in mongo store ${namespace}`);
171187
}
172188
},
173189
del: async (key: string) => {
@@ -179,21 +195,23 @@ export const createMongoStore = (
179195
delete localCache[key];
180196
}
181197
});
198+
success("del", metrics);
182199
} catch (err: any) {
183-
throw storeErr(err, `Error deleting key ${key} from mongo store ${namespace}`);
200+
throw storeErr("del", err, `Error deleting key ${key} from mongo store ${namespace}`);
184201
}
185202
},
186203
ttl: async (key: string) => {
187204
try {
188205
const res =
189206
getFromLocalCache(key) || (await ensureCollection().then(c => c.findOne({ _id: key }, readOptions)));
207+
success("ttl", metrics);
190208
return res
191209
? res.expireAt
192210
? Math.max(Math.floor((res.expireAt.getTime() - new Date().getTime()) / 1000), 0)
193211
: -1
194212
: -2;
195213
} catch (err: any) {
196-
throw storeErr(err, `Error getting key ${key} from mongo store ${namespace}`);
214+
throw storeErr("ttl", err, `Error getting key ${key} from mongo store ${namespace}`);
197215
}
198216
},
199217
};

libs/core-functions/src/index.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,10 @@ export * as mongodbDestination from "./functions/mongodb-destination";
8181
export { mongodb, mongoAnonymousEventsStore } from "./functions/lib/mongodb";
8282
export type {
8383
MetricsMeta,
84+
RotorMetrics,
85+
FuncChainResult,
86+
FunctionExecLog,
87+
FunctionExecRes,
8488
FunctionContext,
8589
FunctionChainContext,
8690
FetchType,

services/rotor/src/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import Prometheus from "prom-client";
88
import { FunctionsHandler, FunctionsHandlerMulti } from "./http/functions";
99
import { initMaxMindClient, GeoResolver } from "./lib/maxmind";
1010
import { MessageHandlerContext, rotorMessageHandler } from "./lib/message-handler";
11-
import { DummyMetrics, Metrics } from "./lib/metrics";
11+
import { DummyMetrics } from "./lib/metrics";
1212
import { connectionsStore, functionsStore } from "./lib/repositories";
1313
import { Server } from "node:net";
1414
import { getApplicationVersion, getDiagnostics } from "./lib/version";

services/rotor/src/lib/functions-chain.ts

Lines changed: 4 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,11 @@ import {
55
createTtlStore,
66
EnrichedConnectionConfig,
77
EntityStore,
8+
FuncChainResult,
89
FunctionChainContext,
910
FunctionConfig,
1011
FunctionContext,
12+
FunctionExecLog,
1113
getBuiltinFunction,
1214
isDropResult,
1315
JitsuFunctionWrapper,
@@ -69,25 +71,6 @@ udfCache.on("del", (key, value) => {
6971
value.wrapper?.close();
7072
});
7173

72-
export type FuncChainResult = {
73-
connectionId?: string;
74-
events: AnyEvent[];
75-
execLog: FunctionExecLog;
76-
};
77-
78-
export type FunctionExecRes = {
79-
receivedAt?: any;
80-
eventIndex: number;
81-
event?: any;
82-
metricsMeta?: MetricsMeta;
83-
functionId: string;
84-
error?: any;
85-
dropped?: boolean;
86-
ms: number;
87-
};
88-
89-
export type FunctionExecLog = FunctionExecRes[];
90-
9174
export function checkError(chainRes: FuncChainResult) {
9275
for (const el of chainRes.execLog) {
9376
if (el.error && (el.error.name === DropRetryErrorName || el.error.name === RetryErrorName)) {
@@ -139,7 +122,8 @@ export function buildFunctionChain(
139122
connection.workspaceId,
140123
mongodb,
141124
false,
142-
fastStoreWorkspaceId.includes(connection.workspaceId)
125+
fastStoreWorkspaceId.includes(connection.workspaceId),
126+
rotorContext.metrics
143127
);
144128
if (rotorContext.redisClient) {
145129
store = createMultiStore(store, createTtlStore(connection.workspaceId, rotorContext.redisClient));

services/rotor/src/lib/message-handler.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import { getLog, requireDefined } from "juava";
2-
import { Metrics } from "./metrics";
32
import { GeoResolver } from "./maxmind";
43
import { IngestMessage } from "@jitsu/protocols/async-request";
54
import { CONNECTION_IDS_HEADER } from "./rotor";
@@ -17,6 +16,7 @@ import {
1716
EnrichedConnectionConfig,
1817
FunctionConfig,
1918
WorkspaceWithProfiles,
19+
RotorMetrics,
2020
} from "@jitsu/core-functions";
2121
import NodeCache from "node-cache";
2222
import { buildFunctionChain, checkError, FuncChain, FuncChainFilter, runChain } from "./functions-chain";
@@ -35,7 +35,7 @@ export type MessageHandlerContext = {
3535
functionsStore: EntityStore<FunctionConfig>;
3636
workspaceStore: EntityStore<WorkspaceWithProfiles>;
3737
eventsLogger: EventsStore;
38-
metrics?: Metrics;
38+
metrics?: RotorMetrics;
3939
geoResolver?: GeoResolver;
4040
dummyPersistentStore?: TTLStore;
4141
redisClient?: Redis;

services/rotor/src/lib/metrics.ts

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,19 @@
11
import { getLog, requireDefined, stopwatch } from "juava";
2-
import { FunctionExecLog, FunctionExecRes } from "./functions-chain";
32
import fetch from "node-fetch-commonjs";
4-
import { MetricsMeta, httpAgent, httpsAgent } from "@jitsu/core-functions";
3+
import {
4+
FunctionExecLog,
5+
FunctionExecRes,
6+
MetricsMeta,
7+
httpAgent,
8+
httpsAgent,
9+
RotorMetrics,
10+
} from "@jitsu/core-functions";
511

612
import omit from "lodash/omit";
713
import type { Producer } from "kafkajs";
814
import { getCompressionType } from "./rotor";
915
import { Readable } from "stream";
10-
import { randomUUID } from "node:crypto";
16+
import { Counter } from "prom-client";
1117

1218
const log = getLog("metrics");
1319
const bulkerBase = requireDefined(process.env.BULKER_URL, "env BULKER_URL is not defined");
@@ -27,17 +33,16 @@ type MetricsEvent = MetricsMeta & {
2733
events: number;
2834
};
2935

30-
export interface Metrics {
31-
logMetrics: (execLog: FunctionExecLog) => void;
32-
close: () => void;
33-
}
34-
35-
export const DummyMetrics: Metrics = {
36+
export const DummyMetrics: RotorMetrics = {
3637
logMetrics: () => {},
38+
storeStatus: () => {},
3739
close: () => {},
3840
};
3941

40-
export function createMetrics(producer?: Producer): Metrics {
42+
export function createMetrics(
43+
producer?: Producer,
44+
storeCounter?: Counter<"namespace" | "operation" | "status">
45+
): RotorMetrics {
4146
const buffer: MetricsEvent[] = [];
4247

4348
const flush = async (buf: MetricsEvent[]) => {
@@ -199,6 +204,11 @@ export function createMetrics(producer?: Producer): Metrics {
199204
buffer.length = 0;
200205
}
201206
},
207+
storeStatus: (namespace: string, operation: string, status: string) => {
208+
if (storeCounter) {
209+
storeCounter.labels(namespace, operation, status).inc();
210+
}
211+
},
202212
close: () => {
203213
clearInterval(interval);
204214
},

services/rotor/src/lib/rotor.ts

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,13 @@ import dayjs from "dayjs";
66
import utc from "dayjs/plugin/utc";
77
dayjs.extend(utc);
88
import { getRetryPolicy, retryBackOffTime, retryLogMessage } from "./retries";
9-
import { createMetrics, Metrics } from "./metrics";
10-
import { FuncChainFilter, FuncChainResult } from "./functions-chain";
9+
import { createMetrics } from "./metrics";
10+
import { FuncChainFilter } from "./functions-chain";
1111
import type { Admin, Consumer, Producer, KafkaMessage } from "kafkajs";
1212
import { CompressionTypes } from "kafkajs";
1313
import { functionFilter, MessageHandlerContext } from "./message-handler";
1414
import { connectionsStore, functionsStore, workspaceStore } from "./repositories";
15+
import { RotorMetrics, FuncChainResult } from "@jitsu/core-functions";
1516

1617
const log = getLog("kafka-rotor");
1718

@@ -43,7 +44,7 @@ export type KafkaRotorConfig = {
4344
};
4445

4546
export type KafkaRotor = {
46-
start: () => Promise<Metrics>;
47+
start: () => Promise<RotorMetrics>;
4748
close: () => Promise<void>;
4849
};
4950

@@ -54,7 +55,7 @@ export function kafkaRotor(cfg: KafkaRotorConfig): KafkaRotor {
5455
let admin: Admin;
5556
let closeQueue: () => Promise<void>;
5657
let interval: any;
57-
let metrics: Metrics;
58+
let metrics: RotorMetrics;
5859
return {
5960
start: async () => {
6061
const kafka = connectToKafka({ defaultAppId: kafkaClientId, ...cfg.credentials });
@@ -69,7 +70,12 @@ export function kafkaRotor(cfg: KafkaRotorConfig): KafkaRotor {
6970

7071
producer = kafka.producer({ allowAutoTopicCreation: false });
7172
await producer.connect();
72-
metrics = createMetrics(producer);
73+
const storeErrors = new Prometheus.Counter({
74+
name: "rotor_store_statuses",
75+
help: "rotor store statuses",
76+
labelNames: ["namespace", "operation", "status"] as const,
77+
});
78+
metrics = createMetrics(producer, storeErrors);
7379
admin = kafka.admin();
7480

7581
const topicOffsets = new Prometheus.Gauge({
@@ -87,21 +93,19 @@ export function kafkaRotor(cfg: KafkaRotorConfig): KafkaRotor {
8793
const messagesProcessed = new Prometheus.Counter({
8894
name: "rotor_messages_processed",
8995
help: "messages processed",
90-
// add `as const` here to enforce label names
9196
labelNames: ["topic", "partition"] as const,
9297
});
9398
const messagesRequeued = new Prometheus.Counter({
9499
name: "rotor_messages_requeued",
95100
help: "messages requeued",
96-
// add `as const` here to enforce label names
97101
labelNames: ["topic"] as const,
98102
});
99103
const messagesDeadLettered = new Prometheus.Counter({
100104
name: "rotor_messages_dead_lettered",
101105
help: "messages dead lettered",
102-
// add `as const` here to enforce label names
103106
labelNames: ["topic"] as const,
104107
});
108+
105109
interval = setInterval(async () => {
106110
try {
107111
for (const topic of kafkaTopics) {

types/protocols/functions.d.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ export interface Store {
2121
ttl(key: string): Promise<number>;
2222
}
2323

24-
export interface Metrics {
24+
export interface FunctionMetrics {
2525
counter(name: string): {
2626
//increment / decrement counter. Supports negative values
2727
inc: (value: number) => void;
@@ -70,7 +70,7 @@ export type FunctionContext<P extends AnyProps = AnyProps> = {
7070
log: FunctionLogger;
7171
fetch: FetchType;
7272
store: TTLStore;
73-
metrics?: Metrics;
73+
metrics?: FunctionMetrics;
7474
props: P;
7575
};
7676

0 commit comments

Comments
 (0)