Warning!!! This packages had be deprecated. Nestjs microservice has fully implemented this function. There is no reason to maintain this package.
This is a nestJS module implemented using KafkaJS. That support multiple connections and fits the coding style of nestjs.
Import KafkaModule.forRoot
to AppModule
:
// app.module.js
import { Module } from "@nestjs/common";
import { KafkaModule } from "@buka/nestjs-kafka";
@Module({
imports: [
KafkaModule.forRoot({
name: "my-kafka",
groupId: "my-group-id",
clientId: "my-client-id",
brokers: ["my_kafka_host:9092"],
}),
],
})
export class AppModule {}
Create a provider named AppConsumer
that consume messages:
// app.consumer.js
@Injectable()
@KafkaConsumer()
export class AppConsumer {
@KafkaConsume("my-topic")
async finishTask(@KafkaMessage() message: string): Promise<void> {
// do something
console.log(message);
}
@KafkaConsume("other-topic", { json: true })
async finishTask(
@KafkaMessage() message: Record<string, any>
): Promise<void> {
// do something
console.log(message);
}
}
AppConsumer
andAppService
can be merged into one provider, but writing them separately will make the code clearer.
Then, append AppConsumer
to AppModule
:
import { Module } from "@nestjs/common";
import { AppConsumer } from "./app.consumer";
@Module({
imports: [
/* ... */
],
providers: [AppConsumer],
})
export class AppModule {}
KafkaProducer
will connect on module init and disconnect on module destroy.
To use this, import KafkaModule.forProducer(options)
to AppModule
:
// app.module.js
import { Module } from "@nestjs/common";
import { KafkaModule, Partitioners } from "@buka/nestjs-kafka";
import AppService from "./app.service";
@Module({
imports: [
KafkaModule.forRoot({
name: "my-kafka",
groupId: "my-group-id",
clientId: "my-client-id",
brokers: ["my_kafka_host:9092"],
}),
KafkaModule.forProducer({
name: "my-kafka",
createPartitioner: Partitioners.LegacyPartitioner,
}),
],
provider: [AppService],
})
export class AppModule {}
The
options
of.forProducer
is exactly the same as theoptions
ofkafka.producer
in KafkaJS。
Inject KafkaProducer
to your AppService
:
// app.service.js
@Injectable()
export class AppService {
constructor(
@InjectKafkaProducer('my-kafka')
private readonly producer: KafkaProducer
) {}
async sendMessage() {
this.producer.send({
topic: 'kafka-topic'
messages: [{ value: 'Hello Kafka' }]
})
}
}
The .send
function of KafkaProducer
is exactly the same as the .send
function of KafkaJS。
Using the KafkaService
, you can create consumer
and producer
like plain KafkaJS.
// app.service.js
import { OnModuleDestroy, OnModuleInit } from "@nestjs/common";
import { Producer, ProducerRecord, RecordMetadata } from "kafkajs";
import { KafkaService } from "@buka/nestjs-kafka";
@Injectable()
export class AppService implements OnModuleInit, OnModuleDestroy {
producer!: Producer;
consumer!: Consumer;
constructor(private readonly kafka: KafkaService) {}
async onModuleInit(): Promise<void> {
this.producer = this.kafka.producer();
await this.producer.connect();
this.consumer = this.kafka.consumer({
groupId: "my-group-id",
});
this.consumer.subscribe({ topic: "kafka-topic" });
this.consumer.run({
eachMessage: async (context) => {
// do somethings
},
});
}
async onModuleDestroy(): Promise<void> {
await this.producer.disconnect();
await this.consumer.disconnect();
}
}
If you don't pay attention to the order of CreateRequestContext
decorators,
you may have problems with any of other method decorators, not only @buka/nestjs-kafka
.
import { Injectable } from "@nestjs/common";
import { KafkaConsumer, KafkaConsume, KafkaMessage } from "@buka/nestjs-kafka";
import { CreateRequestContext } from "@mikro-orm/mysql";
// app.consumer.js
@Injectable()
@KafkaConsumer()
export class AppConsumer {
@CreateRequestContext()
// !! KafkaConsume decorator will not work !!
@KafkaConsume("my-topic")
async finishTask(@KafkaMessage() message: string): Promise<void> {
console.log(message);
}
}
There are two solutions:
-
[recommend] written as two functions:
@Injectable() @KafkaConsumer() export class AppConsumer { @KafkaConsume("my-topic") async consumeMessage(@KafkaMessage() message: string): Promise<void> { // ... filter and format message this.finishTask(JSON.parse(message)) } @CreateRequestContext() async finishTask(task: Task): Promise<void> { // do something console.log(task); }
-
Pay attention to the order of
CreateRequestContext
:@Injectable() @KafkaConsumer() export class AppConsumer { @KafkaConsume("my-topic") // use CreateRequestContext as the last decorator @CreateRequestContext() async finishTask(@KafkaMessage() message: string): Promise<void> { // do something console.log(message); } }