Skip to content

Commit

Permalink
[backend] Add telemetry counter and gauge for graphql queries and mut…
Browse files Browse the repository at this point in the history
…ations (#8262)
  • Loading branch information
richard-julien authored Sep 5, 2024
1 parent be72b4f commit f69c8e8
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 22 deletions.
44 changes: 32 additions & 12 deletions opencti-platform/opencti-graphql/src/config/tracing.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import { SEMATTRS_ENDUSER_ID } from '@opentelemetry/semantic-conventions';
import { MeterProvider, MetricReader, PeriodicExportingMetricReader } from '@opentelemetry/sdk-metrics';
import { type ObservableResult, ValueType } from '@opentelemetry/api-metrics';
import { ValueType } from '@opentelemetry/api-metrics';
import type { Counter } from '@opentelemetry/api-metrics/build/src/types/Metric';
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
import nodeMetrics from 'opentelemetry-node-metrics';
import { OTLPMetricExporter } from '@opentelemetry/exporter-metrics-otlp-http';
import nconf from 'nconf';
import { PrometheusExporter } from '@opentelemetry/exporter-prometheus';
import type { Gauge } from '@opentelemetry/api/build/src/metrics/Metric';
import type { AuthContext, AuthUser } from '../types/user';
import { ENABLED_METRICS, ENABLED_TRACING } from './conf';
import { isNotEmptyField } from '../database/utils';
Expand All @@ -19,27 +20,38 @@ class MeterManager {

private errors: Counter | null = null;

private latencies = 0;
private latencyGauge: Gauge | null = null;

private directBulkGauge: Gauge | null = null;

private sideBulkGauge: Gauge | null = null;

constructor(meterProvider: MeterProvider) {
this.meterProvider = meterProvider;
}

request() {
this.requests?.add(1);
request(attributes: any) {
this.requests?.add(1, attributes);
}

error(attributes: any) {
this.errors?.add(1, attributes);
}

latency(val: number, attributes: any) {
this.latencyGauge?.record(val, attributes);
}

error() {
this.errors?.add(1);
directBulk(val: number, attributes: any) {
this.directBulkGauge?.record(val, attributes);
}

latency(val: number) {
this.latencies = val;
sideBulk(val: number, attributes: any) {
this.sideBulkGauge?.record(val, attributes);
}

registerMetrics() {
const meter = this.meterProvider.getMeter('opencti-api');
// Register manual metrics
// - Basic counters
this.requests = meter.createCounter('opencti_api_requests', {
valueType: ValueType.INT,
Expand All @@ -50,9 +62,17 @@ class MeterManager {
description: 'Counts total number of errors'
});
// - Gauges
const latencyGauge = meter.createObservableGauge('opencti_api_latency');
latencyGauge.addCallback((observableResult: ObservableResult) => {
observableResult.observe(this.latencies);
this.latencyGauge = meter.createGauge('opencti_api_latency', {
valueType: ValueType.INT,
description: 'Latency computing per query'
});
this.directBulkGauge = meter.createGauge('opencti_api_direct_bulk', {
valueType: ValueType.INT,
description: 'Size of bulks for direct absorption'
});
this.sideBulkGauge = meter.createGauge('opencti_api_side_bulk', {
valueType: ValueType.INT,
description: 'Size of bulk for absorption impacts'
});
// - Library metrics
nodeMetrics(this.meterProvider, { prefix: '' });
Expand Down
8 changes: 5 additions & 3 deletions opencti-platform/opencti-graphql/src/database/engine.js
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ import { now, runtimeFieldObservableValueScript } from '../utils/format';
import { ENTITY_TYPE_KILL_CHAIN_PHASE, ENTITY_TYPE_MARKING_DEFINITION, isStixMetaObject } from '../schema/stixMetaObject';
import { getEntitiesListFromCache, getEntityFromCache } from './cache';
import { ENTITY_TYPE_MIGRATION_STATUS, ENTITY_TYPE_SETTINGS, ENTITY_TYPE_STATUS, ENTITY_TYPE_USER, isInternalObject } from '../schema/internalObject';
import { telemetry } from '../config/tracing';
import { meterManager, telemetry } from '../config/tracing';
import {
isBooleanAttribute,
isDateAttribute,
Expand Down Expand Up @@ -3571,7 +3571,7 @@ const prepareIndexing = async (elements) => {
}
return preparedElements;
};
export const elIndexElements = async (context, user, message, elements) => {
export const elIndexElements = async (context, user, indexingType, elements) => {
const elIndexElementsFn = async () => {
// 00. Relations must be transformed before indexing.
const transformedElements = await prepareIndexing(elements);
Expand All @@ -3581,6 +3581,7 @@ export const elIndexElements = async (context, user, message, elements) => {
R.pipe(R.dissoc('_index'))(doc),
]);
if (body.length > 0) {
meterManager.directBulk(body.length, { type: indexingType });
await elBulk({ refresh: true, timeout: BULK_TIMEOUT, body });
}
// 02. If relation, generate impacts for from and to sides
Expand Down Expand Up @@ -3649,12 +3650,13 @@ export const elIndexElements = async (context, user, message, elements) => {
R.dissoc('_index', doc.data),
]);
if (bodyUpdate.length > 0) {
meterManager.sideBulk(bodyUpdate.length, { type: indexingType });
const bulkPromise = elBulk({ refresh: true, timeout: BULK_TIMEOUT, body: bodyUpdate });
await Promise.all([bulkPromise]);
}
return transformedElements.length;
};
return telemetry(context, user, `INSERT ${message}`, {
return telemetry(context, user, `INSERT ${indexingType}`, {
[SEMATTRS_DB_NAME]: 'search_engine',
[SEMATTRS_DB_OPERATION]: 'insert',
}, elIndexElementsFn);
Expand Down
13 changes: 10 additions & 3 deletions opencti-platform/opencti-graphql/src/graphql/telemetryPlugin.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ export default {
requestDidStart: /* v8 ignore next */ () => {
let tracingSpan;
const start = Date.now();
meterManager.request();
return {
didResolveOperation: (resolveContext) => {
const isWrite = resolveContext.operation && resolveContext.operation.operation === 'mutation';
Expand All @@ -48,14 +47,22 @@ export default {
tracingSpan.setAttribute(SEMATTRS_MESSAGING_MESSAGE_PAYLOAD_COMPRESSED_SIZE_BYTES, payloadSize);
}
if (requestError) {
meterManager.error();
const operation = sendContext.request.query.startsWith('mutation') ? 'mutation' : 'query';
const { operationName } = sendContext.request;
const type = sendContext.response.body.singleResult.errors.at(0)?.name ?? requestError.name;
const operationAttributes = { operation, name: operationName, type };
meterManager.error(operationAttributes);
if (tracingSpan) {
tracingSpan.setStatus({ code: 2, message: requestError.name });
}
} else {
const operation = sendContext.operation?.operation ?? 'query';
const operationName = sendContext.operationName ?? 'Unspecified';
const operationAttributes = { operation, name: operationName };
meterManager.request(operationAttributes);
const stop = Date.now();
const elapsed = stop - start;
meterManager.latency(elapsed);
meterManager.latency(elapsed, operationAttributes);
if (tracingSpan) {
tracingSpan.setStatus({ code: 1 });
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ const historyIndexing = async (context: AuthContext, events: Array<SseEvent<Acti
};
});
// Bulk the history data insertions
return elIndexElements(context, SYSTEM_USER, `activity (${historyElements.length})`, historyElements);
return elIndexElements(context, SYSTEM_USER, ENTITY_TYPE_ACTIVITY, historyElements);
};

const eventsApplyHandler = async (context: AuthContext, events: Array<SseEvent<ActivityStreamEvent>>) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ const eventsApplyHandler = async (context: AuthContext, events: Array<SseEvent<S
};
});
// Bulk the history data insertions
await elIndexElements(context, SYSTEM_USER, `history (${historyElements.length})`, historyElements);
await elIndexElements(context, SYSTEM_USER, ENTITY_TYPE_HISTORY, historyElements);
};

const historyStreamHandler = async (streamEvents: Array<SseEvent<StreamDataEvent>>) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { logApp } from '../config/conf';
import { buildFileDataForIndexing } from '../modules/internal/document/document-domain';
import { elIndexElements } from '../database/engine';
import { INDEX_INTERNAL_OBJECTS } from '../database/utils';
import { ENTITY_TYPE_INTERNAL_FILE } from '../schema/internalObject';

export const up = async (next) => {
const context = executionContext('migration');
Expand All @@ -17,7 +18,7 @@ export const up = async (next) => {
const elements = files
.filter((file) => file.name.length <= 200)
.map((file) => ({ _index: INDEX_INTERNAL_OBJECTS, ...buildFileDataForIndexing(file) }));
await elIndexElements(context, SYSTEM_USER, 'Migration files registration', elements);
await elIndexElements(context, SYSTEM_USER, ENTITY_TYPE_INTERNAL_FILE, elements);
if (elementNotIndexed.length > 0) {
logApp.error('[MIGRATION] Some files were not indexed, you will need to re-upload them', { elementNotIndexed });
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -769,7 +769,7 @@ describe('Elasticsearch reindex', () => {
expect(data.toId === attackPatternId).toBeTruthy(); // attack-pattern--2fc04aa5-48c1-49ec-919a-b88241ef1d17
});
it('should relation reindex check consistency', async () => {
const indexPromise = elIndexElements(testContext, ADMIN_USER, 'test', [{ relationship_type: 'uses' }]);
const indexPromise = elIndexElements(testContext, ADMIN_USER, 'uses', [{ relationship_type: 'uses' }]);
// noinspection ES6MissingAwait
expect(indexPromise).rejects.toThrow();
});
Expand Down

0 comments on commit f69c8e8

Please sign in to comment.