diff --git a/examples/HttpGate/index.ts b/examples/HttpGate/index.ts index da6b935..d696e13 100644 --- a/examples/HttpGate/index.ts +++ b/examples/HttpGate/index.ts @@ -51,12 +51,15 @@ const upHttpGate = async (service: Service) => { message.meter.end(); }); - matchBatchEmitter.on('FibonacciNumber', messages => { + matchBatchEmitter.on('FibonacciNumber', async (messages, meter) => { + meter.start(); logger.info( 'Get new event "FibonacciNumber": ', messages.map(message => message.data), ); + await meter.measure(fakeRequest, [1000], {}, { location: 'external', type: 'dbms', name: 'postgresql2' }); messages.forEach(message => message.ack()); + meter.end(); }); const fastify = Fastify(); diff --git a/src/Client.ts b/src/Client.ts index eb84c50..e78ae3b 100644 --- a/src/Client.ts +++ b/src/Client.ts @@ -84,6 +84,8 @@ export class Client extends Root { private async startBatchWatch(fetcher: StreamBatchMsgFetcher, listener: EventEmitter, eventName: string) { while (true) { const batch: Partial>[] = []; + const baggages: Baggage[] = []; + const events = await fetcher.fetch(); for await (const event of events) { @@ -98,8 +100,22 @@ export class Client extends Root { message.nak = event.nak.bind(event); batch.push(message); + + let baggage: Baggage | undefined; + if (event.headers) { + baggage = this.getBaggageFromNATSHeader(event.headers); + if (baggage) { + baggages.push(baggage); + } + } + + message.meter = new Meter(eventName, baggage); + } + + if (batch.length > 0) { + const meter = new Meter(eventName, undefined, baggages); + listener.emit(eventName, batch, meter); } - if (batch.length > 0) listener.emit(eventName, batch); } } diff --git a/src/Meter.ts b/src/Meter.ts index 04ac023..bf00377 100644 --- a/src/Meter.ts +++ b/src/Meter.ts @@ -1,15 +1,40 @@ import { Baggage, EventMeter, Tag, TagKey } from './interfaces'; import * as opentelemetry from '@opentelemetry/api'; import { Root } from './Root'; +import { randomBytes } from 'node:crypto'; export class Meter extends Root implements EventMeter { private span?: opentelemetry.Span; - constructor(private name: string, private baggage?: Baggage) { + private readonly SPAN_ID_BYTE_LENGTH = 8; + constructor(private name: string, private baggage?: Baggage, private links?: Baggage[]) { super(); } public start() { const tracer = opentelemetry.trace.getTracer(''); - this.span = tracer.startSpan(this.name, { kind: opentelemetry.SpanKind.CONSUMER }, this.getContext(this.baggage)); + + const links: opentelemetry.Link[] = []; + if (this.links && this.links.length) { + this.links.forEach(link => { + links.push({ + context: { traceId: link.traceId, spanId: link.spanId, traceFlags: link.traceFlags }, + }); + }); + } + + if (!this.baggage && links.length) { + this.baggage = { + traceId: links[0].context.traceId, + spanId: randomBytes(this.SPAN_ID_BYTE_LENGTH).toString('hex'), + traceFlags: links[0].context.traceFlags, + }; + } + + const linksOption = links.length ? { links } : {}; + this.span = tracer.startSpan( + this.name, + { kind: opentelemetry.SpanKind.CONSUMER, ...linksOption }, + this.getContext(this.baggage), + ); } public end(error?: Error) { if (!this.span) { diff --git a/src/interfaces.ts b/src/interfaces.ts index 772fd5d..b3e62da 100644 --- a/src/interfaces.ts +++ b/src/interfaces.ts @@ -167,7 +167,7 @@ export interface Listener { } export interface ListenerBatch { - on(action: A, handler: (params: Array[0]>) => void): void; + on(action: A, handler: (params: Array[0]>, meter: EventMeter) => void): void; off(action: A, handler: (params: Array[0]>) => void): void; }