Skip to content

Commit

Permalink
chore: Updated kafkajs, langchain, & openai tests to node:test (#2723)
Browse files Browse the repository at this point in the history
  • Loading branch information
jsumners-nr authored Nov 12, 2024
1 parent 636899b commit b6e4c5d
Show file tree
Hide file tree
Showing 20 changed files with 2,014 additions and 1,979 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,32 @@

'use strict'

const tap = require('tap')
const helper = require('../../lib/agent_helper')
const params = require('../../lib/params')
const test = require('node:test')
const tspl = require('@matteo.collina/tspl')

const { removeModules } = require('../../lib/cache-buster')
const { assertSegments, match } = require('../../lib/custom-assertions')
const params = require('../../lib/params')
const helper = require('../../lib/agent_helper')
const utils = require('./utils')
const SEGMENT_PREFIX = 'kafkajs.Kafka.consumer#'

const SEGMENT_PREFIX = 'kafkajs.Kafka.consumer#'
const broker = `${params.kafka_host}:${params.kafka_port}`

tap.beforeEach(async (t) => {
t.context.agent = helper.instrumentMockedAgent({
test.beforeEach(async (ctx) => {
ctx.nr = {}
ctx.nr.agent = helper.instrumentMockedAgent({
feature_flag: {
kafkajs_instrumentation: true
}
})

const { Kafka, logLevel } = require('kafkajs')
t.context.Kafka = Kafka
ctx.nr.Kafka = Kafka
const topic = utils.randomString()
t.context.topic = topic
ctx.nr.topic = topic
const clientId = utils.randomString('kafka-test')
t.context.clientId = clientId
ctx.nr.clientId = clientId

const kafka = new Kafka({
clientId,
Expand All @@ -37,52 +41,47 @@ tap.beforeEach(async (t) => {

const producer = kafka.producer()
await producer.connect()
t.context.producer = producer
ctx.nr.producer = producer
const consumer = kafka.consumer({ groupId: 'kafka' })
await consumer.connect()
t.context.consumer = consumer
ctx.nr.consumer = consumer
})

tap.afterEach(async (t) => {
helper.unloadAgent(t.context.agent)
test.afterEach(async (ctx) => {
helper.unloadAgent(ctx.nr.agent)
removeModules(['kafkajs'])
await t.context.consumer.disconnect()
await t.context.producer.disconnect()
await ctx.nr.consumer.disconnect()
await ctx.nr.producer.disconnect()
})

tap.test('send records correctly', (t) => {
t.plan(8)

const { agent, consumer, producer, topic } = t.context
test('send records correctly', async (t) => {
const plan = tspl(t, { plan: 8 })
const { agent, consumer, producer, topic } = t.nr
const message = 'test message'
const expectedName = 'produce-tx'
let txCount = 0

agent.on('transactionFinished', (tx) => {
txCount++
if (tx.name === expectedName) {
const name = `MessageBroker/Kafka/Topic/Produce/Named/${topic}`
const segment = tx.agent.tracer.getSegment()
if (tx.name !== expectedName) {
return
}

const foundSegment = segment.children.find((s) => s.name.endsWith(topic))
t.equal(foundSegment.name, name)
const name = `MessageBroker/Kafka/Topic/Produce/Named/${topic}`
const segment = tx.agent.tracer.getSegment()

const metric = tx.metrics.getMetric(name)
t.equal(metric.callCount, 1)
const sendMetric = agent.metrics.getMetric(
'Supportability/Features/Instrumentation/kafkajs/send'
)
t.equal(sendMetric.callCount, 1)
const foundSegment = segment.children.find((s) => s.name.endsWith(topic))
plan.equal(foundSegment.name, name)

const produceTrackingMetric = agent.metrics.getMetric(
`MessageBroker/Kafka/Nodes/${broker}/Produce/${topic}`
)
t.equal(produceTrackingMetric.callCount, 1)
}
const metric = tx.metrics.getMetric(name)
plan.equal(metric.callCount, 1)
const sendMetric = agent.metrics.getMetric(
'Supportability/Features/Instrumentation/kafkajs/send'
)
plan.equal(sendMetric.callCount, 1)

if (txCount === 2) {
t.end()
}
const produceTrackingMetric = agent.metrics.getMetric(
`MessageBroker/Kafka/Nodes/${broker}/Produce/${topic}`
)
plan.equal(produceTrackingMetric.callCount, 1)
})

helper.runInTransaction(agent, async (tx) => {
Expand All @@ -91,10 +90,10 @@ tap.test('send records correctly', (t) => {
const promise = new Promise((resolve) => {
consumer.run({
eachMessage: async ({ message: actualMessage }) => {
t.equal(actualMessage.value.toString(), message)
t.equal(actualMessage.headers['x-foo'].toString(), 'foo')
t.equal(actualMessage.headers.newrelic.toString(), '')
t.equal(actualMessage.headers.traceparent.toString().startsWith('00-'), true)
plan.equal(actualMessage.value.toString(), message)
plan.equal(actualMessage.headers['x-foo'].toString(), 'foo')
plan.equal(actualMessage.headers.newrelic.toString(), '')
plan.equal(actualMessage.headers.traceparent.toString().startsWith('00-'), true)
resolve()
}
})
Expand All @@ -117,13 +116,15 @@ tap.test('send records correctly', (t) => {

tx.end()
})

await plan.completed
})

tap.test('send passes along DT headers', (t) => {
test('send passes along DT headers', async (t) => {
const plan = tspl(t, { plan: 13 })
const { agent, consumer, producer, topic } = t.nr
const expectedName = 'produce-tx'

const { agent, consumer, producer, topic } = t.context

// These agent.config lines are utilized to simulate the inbound
// distributed trace that we are trying to validate.
agent.config.account_id = 'account_1'
Expand All @@ -143,8 +144,7 @@ tap.test('send passes along DT headers', (t) => {
}

if (txCount === 3) {
utils.verifyDistributedTrace({ t, consumeTxs, produceTx })
t.end()
utils.verifyDistributedTrace({ plan, consumeTxs, produceTx })
}
})

Expand Down Expand Up @@ -178,12 +178,13 @@ tap.test('send passes along DT headers', (t) => {

tx.end()
})
})

tap.test('sendBatch records correctly', (t) => {
t.plan(9)
await plan.completed
})

const { agent, consumer, producer, topic } = t.context
test('sendBatch records correctly', async (t) => {
const plan = tspl(t, { plan: 9 })
const { agent, consumer, producer, topic } = t.nr
const message = 'test message'
const expectedName = 'produce-tx'

Expand All @@ -193,23 +194,21 @@ tap.test('sendBatch records correctly', (t) => {
const segment = tx.agent.tracer.getSegment()

const foundSegment = segment.children.find((s) => s.name.endsWith(topic))
t.equal(foundSegment.name, name)
plan.equal(foundSegment.name, name)

const metric = tx.metrics.getMetric(name)
t.equal(metric.callCount, 1)
plan.equal(metric.callCount, 1)

t.equal(tx.isDistributedTrace, true)
plan.equal(tx.isDistributedTrace, true)
const sendMetric = agent.metrics.getMetric(
'Supportability/Features/Instrumentation/kafkajs/sendBatch'
)
t.equal(sendMetric.callCount, 1)
plan.equal(sendMetric.callCount, 1)

const produceTrackingMetric = agent.metrics.getMetric(
`MessageBroker/Kafka/Nodes/${broker}/Produce/${topic}`
)
t.equal(produceTrackingMetric.callCount, 1)

t.end()
plan.equal(produceTrackingMetric.callCount, 1)
}
})

Expand All @@ -219,10 +218,10 @@ tap.test('sendBatch records correctly', (t) => {
const promise = new Promise((resolve) => {
consumer.run({
eachMessage: async ({ message: actualMessage }) => {
t.equal(actualMessage.value.toString(), message)
t.match(actualMessage.headers['x-foo'].toString(), 'foo')
t.equal(actualMessage.headers.newrelic.toString(), '')
t.equal(actualMessage.headers.traceparent.toString().startsWith('00-'), true)
plan.equal(actualMessage.value.toString(), message)
match(actualMessage.headers['x-foo'].toString(), 'foo', { assert: plan })
plan.equal(actualMessage.headers.newrelic.toString(), '')
plan.equal(actualMessage.headers.traceparent.toString().startsWith('00-'), true)
resolve()
}
})
Expand All @@ -247,24 +246,27 @@ tap.test('sendBatch records correctly', (t) => {

tx.end()
})

await plan.completed
})

tap.test('consume outside of a transaction', async (t) => {
const { agent, consumer, producer, topic, clientId } = t.context
test('consume outside of a transaction', async (t) => {
const plan = tspl(t, { plan: 16 })
const { agent, consumer, producer, topic, clientId } = t.nr
const message = 'test message'

const txPromise = new Promise((resolve) => {
agent.on('transactionFinished', (tx) => {
utils.verifyConsumeTransaction({ t, tx, topic, clientId })
utils.verifyConsumeTransaction({ plan, tx, topic, clientId })
const sendMetric = agent.metrics.getMetric(
'Supportability/Features/Instrumentation/kafkajs/eachMessage'
)
t.equal(sendMetric.callCount, 1)
plan.equal(sendMetric.callCount, 1)

const consumeTrackingMetric = agent.metrics.getMetric(
`MessageBroker/Kafka/Nodes/${broker}/Consume/${topic}`
)
t.equal(consumeTrackingMetric.callCount, 1)
plan.equal(consumeTrackingMetric.callCount, 1)

resolve()
})
Expand All @@ -274,7 +276,7 @@ tap.test('consume outside of a transaction', async (t) => {
const testPromise = new Promise((resolve) => {
consumer.run({
eachMessage: async ({ message: actualMessage }) => {
t.equal(actualMessage.value.toString(), message)
plan.equal(actualMessage.value.toString(), message)
resolve()
}
})
Expand All @@ -286,11 +288,13 @@ tap.test('consume outside of a transaction', async (t) => {
messages: [{ key: 'key', value: message }]
})

return Promise.all([txPromise, testPromise])
await Promise.all([txPromise, testPromise])
await plan.completed
})

tap.test('consume inside of a transaction', async (t) => {
const { agent, consumer, producer, topic, clientId } = t.context
test('consume inside of a transaction', async (t) => {
const plan = tspl(t, { plan: 44 })
const { agent, consumer, producer, topic, clientId } = t.nr
const expectedName = 'testing-tx-consume'

const messages = ['one', 'two', 'three']
Expand All @@ -301,11 +305,16 @@ tap.test('consume inside of a transaction', async (t) => {
agent.on('transactionFinished', (tx) => {
txCount++
if (tx.name === expectedName) {
t.assertSegments(tx.trace.root, [`${SEGMENT_PREFIX}subscribe`, `${SEGMENT_PREFIX}run`], {
exact: false
})
assertSegments(
tx.trace.root,
[`${SEGMENT_PREFIX}subscribe`, `${SEGMENT_PREFIX}run`],
{
exact: false
},
{ assert: plan }
)
} else {
utils.verifyConsumeTransaction({ t, tx, topic, clientId })
utils.verifyConsumeTransaction({ plan, tx, topic, clientId })
}

if (txCount === messages.length + 1) {
Expand All @@ -321,7 +330,7 @@ tap.test('consume inside of a transaction', async (t) => {
consumer.run({
eachMessage: async ({ message: actualMessage }) => {
msgCount++
t.ok(messages.includes(actualMessage.value.toString()))
plan.ok(messages.includes(actualMessage.value.toString()))
if (msgCount === messages.length) {
resolve()
}
Expand All @@ -339,19 +348,25 @@ tap.test('consume inside of a transaction', async (t) => {
tx.end()
return Promise.all([txPromise, testPromise])
})

await plan.completed
})

tap.test('consume batch inside of a transaction', async (t) => {
const { agent, consumer, producer, topic } = t.context
test('consume batch inside of a transaction', async (t) => {
const plan = tspl(t, { plan: 10 })
const { agent, consumer, producer, topic } = t.nr
const expectedName = 'testing-tx-consume'

const messages = ['one', 'two', 'three', 'four', 'five']

const txPromise = new Promise((resolve) => {
agent.on('transactionFinished', (tx) => {
t.assertSegments(tx.trace.root, [`${SEGMENT_PREFIX}subscribe`, `${SEGMENT_PREFIX}run`], {
exact: false
})
assertSegments(
tx.trace.root,
[`${SEGMENT_PREFIX}subscribe`, `${SEGMENT_PREFIX}run`],
{ exact: false },
{ assert: plan }
)
resolve()
})
})
Expand All @@ -362,23 +377,23 @@ tap.test('consume batch inside of a transaction', async (t) => {
const testPromise = new Promise((resolve) => {
consumer.run({
eachBatch: async ({ batch }) => {
t.equal(
plan.equal(
batch.messages.length,
messages.length,
`should have ${messages.length} messages in batch`
)
batch.messages.forEach((m) => {
t.ok(messages.includes(m.value.toString()), 'should have message')
plan.ok(messages.includes(m.value.toString()), 'should have message')
})
const sendMetric = agent.metrics.getMetric(
'Supportability/Features/Instrumentation/kafkajs/eachBatch'
)
t.equal(sendMetric.callCount, 1)
plan.equal(sendMetric.callCount, 1)

const consumeTrackingMetric = agent.metrics.getMetric(
`MessageBroker/Kafka/Nodes/${broker}/Consume/${topic}`
)
t.equal(consumeTrackingMetric.callCount, 1)
plan.equal(consumeTrackingMetric.callCount, 1)

resolve()
}
Expand All @@ -395,4 +410,6 @@ tap.test('consume batch inside of a transaction', async (t) => {
tx.end()
return Promise.all([txPromise, testPromise])
})

await plan.completed
})
2 changes: 1 addition & 1 deletion test/versioned/kafkajs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"kafkajs": ">=2.0.0"
},
"files": [
"kafka.tap.js"
"kafka.test.js"
]
}
]
Expand Down
Loading

0 comments on commit b6e4c5d

Please sign in to comment.