From 5899c6be932e774c2d518d44df7d8a0be7e096c6 Mon Sep 17 00:00:00 2001 From: gleip Date: Tue, 1 Jul 2025 18:13:46 +0300 Subject: [PATCH 1/2] fix: Meter for batch consumer --- examples/HttpGate/index.ts | 5 ++++- src/Client.ts | 18 +++++++++++++++++- src/Meter.ts | 28 ++++++++++++++++++++++++++-- src/interfaces.ts | 2 +- 4 files changed, 48 insertions(+), 5 deletions(-) diff --git a/examples/HttpGate/index.ts b/examples/HttpGate/index.ts index da6b935..d696e13 100644 --- a/examples/HttpGate/index.ts +++ b/examples/HttpGate/index.ts @@ -51,12 +51,15 @@ const upHttpGate = async (service: Service) => { message.meter.end(); }); - matchBatchEmitter.on('FibonacciNumber', messages => { + matchBatchEmitter.on('FibonacciNumber', async (messages, meter) => { + meter.start(); logger.info( 'Get new event "FibonacciNumber": ', messages.map(message => message.data), ); + await meter.measure(fakeRequest, [1000], {}, { location: 'external', type: 'dbms', name: 'postgresql2' }); messages.forEach(message => message.ack()); + meter.end(); }); const fastify = Fastify(); diff --git a/src/Client.ts b/src/Client.ts index eb84c50..e78ae3b 100644 --- a/src/Client.ts +++ b/src/Client.ts @@ -84,6 +84,8 @@ export class Client extends Root { private async startBatchWatch(fetcher: StreamBatchMsgFetcher, listener: EventEmitter, eventName: string) { while (true) { const batch: Partial>[] = []; + const baggages: Baggage[] = []; + const events = await fetcher.fetch(); for await (const event of events) { @@ -98,8 +100,22 @@ export class Client extends Root { message.nak = event.nak.bind(event); batch.push(message); + + let baggage: Baggage | undefined; + if (event.headers) { + baggage = this.getBaggageFromNATSHeader(event.headers); + if (baggage) { + baggages.push(baggage); + } + } + + message.meter = new Meter(eventName, baggage); + } + + if (batch.length > 0) { + const meter = new Meter(eventName, undefined, baggages); + listener.emit(eventName, batch, meter); } - if (batch.length > 0) listener.emit(eventName, batch); } } diff --git a/src/Meter.ts b/src/Meter.ts index 04ac023..c9b487f 100644 --- a/src/Meter.ts +++ b/src/Meter.ts @@ -1,15 +1,39 @@ import { Baggage, EventMeter, Tag, TagKey } from './interfaces'; import * as opentelemetry from '@opentelemetry/api'; import { Root } from './Root'; +import { randomBytes } from 'node:crypto'; export class Meter extends Root implements EventMeter { private span?: opentelemetry.Span; - constructor(private name: string, private baggage?: Baggage) { + private readonly SPAN_ID_BYTE_LENGTH = 8; + constructor(private name: string, private baggage?: Baggage, private links?: Baggage[]) { super(); } public start() { const tracer = opentelemetry.trace.getTracer(''); - this.span = tracer.startSpan(this.name, { kind: opentelemetry.SpanKind.CONSUMER }, this.getContext(this.baggage)); + + const links: opentelemetry.Link[] = []; + if (this.links && this.links.length) { + this.links.forEach(link => { + links.push({ + context: { traceId: link.traceId, spanId: link.spanId, traceFlags: link.traceFlags }, + }); + }); + } + + if (!this.baggage && links.length) { + this.baggage = { + traceId: links[0].context.traceId, + spanId: randomBytes(this.SPAN_ID_BYTE_LENGTH).toString('hex'), + traceFlags: links[0].context.traceFlags, + }; + } + + this.span = tracer.startSpan( + this.name, + { kind: opentelemetry.SpanKind.CONSUMER, links }, + this.getContext(this.baggage), + ); } public end(error?: Error) { if (!this.span) { diff --git a/src/interfaces.ts b/src/interfaces.ts index 772fd5d..b3e62da 100644 --- a/src/interfaces.ts +++ b/src/interfaces.ts @@ -167,7 +167,7 @@ export interface Listener { } export interface ListenerBatch { - on(action: A, handler: (params: Array[0]>) => void): void; + on(action: A, handler: (params: Array[0]>, meter: EventMeter) => void): void; off(action: A, handler: (params: Array[0]>) => void): void; } From e612c28915d4a3f8873ee143ae3f24f5f187b524 Mon Sep 17 00:00:00 2001 From: gleip Date: Tue, 1 Jul 2025 18:20:42 +0300 Subject: [PATCH 2/2] debug: Not send links field if link empty --- src/Meter.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/Meter.ts b/src/Meter.ts index c9b487f..bf00377 100644 --- a/src/Meter.ts +++ b/src/Meter.ts @@ -29,9 +29,10 @@ export class Meter extends Root implements EventMeter { }; } + const linksOption = links.length ? { links } : {}; this.span = tracer.startSpan( this.name, - { kind: opentelemetry.SpanKind.CONSUMER, links }, + { kind: opentelemetry.SpanKind.CONSUMER, ...linksOption }, this.getContext(this.baggage), ); }