Skip to content

Commit

Permalink
misc: add user APM metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
MartinKolarik committed Dec 17, 2024
1 parent c238626 commit ff9c0c1
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 25 deletions.
26 changes: 13 additions & 13 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 5 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
"crypto-random-string": "^5.0.0",
"csv-parser": "^3.0.0",
"elastic-apm-node": "^4.9.0",
"elastic-apm-utils": "^4.1.1",
"elastic-apm-utils": "^4.1.2",
"got": "^14.4.5",
"gunzip-maybe": "^1.4.2",
"h-logger2": "^1.3.1",
"h-logger2": "^1.3.2",
"h-logger2-elastic": "^6.2.1",
"http-errors": "^2.0.0",
"ipaddr.js": "^2.2.0",
Expand Down Expand Up @@ -98,7 +98,7 @@
"eslint": "^8.57.1",
"husky": "^9.1.7",
"lint-staged": "^15.2.11",
"mocha": "^11.0.1",
"mocha": "^11.0.2",
"mock-fs": "^5.4.1",
"nock": "^13.5.6",
"relative-day-utc": "^1.3.0",
Expand All @@ -114,7 +114,7 @@
},
"optionalDependencies": {
"bufferutil": "^4.0.8",
"utf-8-validate": "^6.0.3"
"utf-8-validate": "^6.0.5"
},
"scripts": {
"build": "tsc && npm run download:files && npm run commit:hash && cp -r public dist",
Expand All @@ -126,6 +126,7 @@
"stats": "tsx probes-stats/known.ts",
"start": "ELASTIC_APM_CONFIG_FILE=elastic-apm-node.cjs node --max_old_space_size=3584 --max-semi-space-size=128 --experimental-loader elastic-apm-node/loader.mjs -r elastic-apm-node/start.js dist/src/index.js",
"start:dev": "NODE_ENV=development FAKE_PROBE_IP=1 ELASTIC_APM_CONFIG_FILE=elastic-apm-node.cjs tsx src/index.ts",
"start:apm": "ELASTIC_APM_CONFIG_FILE=elastic-apm-node.cjs FAKE_PROBE_IP=1 node --max_old_space_size=3584 --max-semi-space-size=128 --experimental-loader elastic-apm-node/loader.mjs -r elastic-apm-node/start.js dist/src/index.js",
"start:test": "NODE_ENV=test knex migrate:latest && NODE_ENV=test ELASTIC_APM_CONFIG_FILE=elastic-apm-node.cjs tsx src/index.ts",
"lint": "npm run lint:js && npm run lint:types && npm run lint:docs",
"lint:fix": "npm run lint:js:fix && npm run lint:types && npm run lint:docs",
Expand Down
37 changes: 29 additions & 8 deletions src/lib/metrics.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import _ from 'lodash';
import apmAgent from 'elastic-apm-node';
import type { Server as SocketServer } from 'socket.io';
import type { Knex } from 'knex';

import { scopedLogger } from './logger.js';
import { fetchProbes, getWsServer, PROBES_NAMESPACE } from './ws/server.js';
import { getMeasurementRedisClient, type RedisClient } from './redis/measurement-client.js';
import { USERS_TABLE } from './http/auth.js';
import { client } from './sql/client.js';

const logger = scopedLogger('metrics');

Expand All @@ -19,6 +22,7 @@ export class MetricsAgent {
constructor (
private readonly io: SocketServer,
private readonly redis: RedisClient,
private readonly sql: Knex,
) {}

run (): void {
Expand All @@ -32,11 +36,11 @@ export class MetricsAgent {
// finished measurements use 2 keys
// 1 global key tracks the in-progress measurements
return Math.round((dbSize - awaitingSize - 1) / 2);
});
}, 60 * 1000);

this.registerAsyncCollector(`gp.probe.count.local`, async () => {
return this.io.of(PROBES_NAMESPACE).local.fetchSockets().then(sockets => sockets.length);
});
}, 10 * 1000);

this.registerAsyncGroupCollector('global probe stats', async () => {
const probes = await fetchProbes();
Expand All @@ -52,7 +56,24 @@ export class MetricsAgent {
'gp.probe.count.adopted': probes.filter(probe => probe.owner).length,
'gp.probe.count.total': probes.length,
};
});
}, 10 * 1000);

this.registerAsyncGroupCollector(`user stats`, async () => {
const result = await this.sql(USERS_TABLE)
.count('user_type as c')
.groupBy('user_type')
.select<{ user_type: string, c: number }[]>([ 'user_type' ]);

const countByType = _(result)
.mapKeys(record => `gp.user.count.${record.user_type}`)
.mapValues(record => record.c)
.value();

return {
...countByType,
'gp.user.count.total': _.sum(Object.values(countByType)),
};
}, 60 * 1000);
}

recordMeasurementTime (type: string, time: number): void {
Expand Down Expand Up @@ -138,17 +159,17 @@ export class MetricsAgent {
});
}

private registerAsyncCollector (name: string, callback: () => Promise<number>): void {
private registerAsyncCollector (name: string, callback: () => Promise<number>, interval: number): void {
this.timers[name] = setInterval(() => {
callback().then((value) => {
this.recordAsyncDatapoint(name, value);
}).catch((error) => {
logger.error(`Failed to collect an async metric "${name}"`, error);
});
}, 10 * 1000);
}, interval);
}

private registerAsyncGroupCollector (groupName: string, callback: () => Promise<{[k: string]: number}>): void {
private registerAsyncGroupCollector (groupName: string, callback: () => Promise<{[k: string]: number}>, interval: number): void {
this.timers[groupName] = setInterval(() => {
callback().then((group) => {
Object.entries(group).forEach(([ key, value ]) => {
Expand All @@ -157,15 +178,15 @@ export class MetricsAgent {
}).catch((error) => {
logger.error(`Failed to collect an async metric group "${groupName}"`, error);
});
}, 10 * 1000);
}, interval);
}
}

let agent: MetricsAgent;

export const getMetricsAgent = () => {
if (!agent) {
agent = new MetricsAgent(getWsServer(), getMeasurementRedisClient());
agent = new MetricsAgent(getWsServer(), getMeasurementRedisClient(), client);
}

return agent;
Expand Down

0 comments on commit ff9c0c1

Please sign in to comment.