diff --git a/README.md b/README.md index 7e65531..0245fdd 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ That support multiple connections and fits the coding style of nestjs. ## Usage -Import `KafkaModule`: +Import `KafkaModule.forRoot` to `AppModule`: ```typescript // app.module.js @@ -33,7 +33,9 @@ import { KafkaModule } from "@buka/nestjs-kafka"; export class AppModule {} ``` -Add a provider named `AppConsumer` that consume message: +### KafkaConsumer + +Create a provider named `AppConsumer` that consume messages: ```typescript // app.consumer.js @@ -56,7 +58,9 @@ export class AppConsumer { } ``` -Append `AppConsumer` to `AppModule`: +> `AppConsumer` and `AppService` can be merged into one provider, but writing them separately will make the code clearer. + +Then, append `AppConsumer` to `AppModule`: ```typescript import { Module } from "@nestjs/common"; @@ -70,3 +74,96 @@ import { AppConsumer } from "./app.consumer"; }) export class AppModule {} ``` + +### KafkaProducer + +`KafkaProducer` will connect on module init and disconnect on module destroy. +To use this, import `KafkaModule.forProducer(options)` to `AppModule`: + +```typescript +// app.module.js +import { Module } from "@nestjs/common"; +import { KafkaModule, Partitioners } from "@buka/nestjs-kafka"; +import AppService from "./app.service"; + +@Module({ + imports: [ + KafkaModule.forRoot({ + name: "my-kafka", + groupId: "my-group-id", + clientId: "my-client-id", + brokers: ["my_kafka_host:9092"], + }), + KafkaModule.forProducer({ + name: "my-kafka", + createPartitioner: Partitioners.LegacyPartitioner, + }), + ], + provider: [AppService], +}) +export class AppModule {} +``` + +> The `options` of `.forProducer` is exactly the same as [the `options` of `kafka.producer` in KafkaJS](https://kafka.js.org/docs/producing)。 + +Inject `KafkaProducer` to your `AppService`: + +```typescript +// app.service.js +@Injectable() +export class AppService { + constructor( + @InjectKafkaProducer('my-kafka') + private readonly producer: KafkaProducer + ) {} + + async sendMessage() { + this.producer.send({ + topic: 'kafka-topic' + messages: [{ value: 'Hello Kafka' }] + }) + } +} +``` + +The `.send` function of `KafkaProducer` is exactly the same as [the `.send` function of KafkaJS](https://kafka.js.org/docs/producing#producing-messages)。 + +### KafkaService + +Using the `KafkaService`, you can create `consumer` and `producer` like plain KafkaJS. + +```typescript +// app.service.js +import { OnModuleDestroy, OnModuleInit } from "@nestjs/common"; +import { Producer, ProducerRecord, RecordMetadata } from "kafkajs"; +import { KafkaService } from "@buka/nestjs-kafka"; + +@Injectable() +export class AppService implements OnModuleInit, OnModuleDestroy { + producer!: Producer; + consumer!: Consumer; + + constructor(private readonly kafka: KafkaService) {} + + async onModuleInit(): Promise { + this.producer = this.kafka.producer(); + await this.producer.connect(); + + this.consumer = this.kafka.consumer({ + groupId: "my-group-id", + }); + + this.consumer.subscribe({ topic: "kafka-topic" }); + this.consumer.run({ + eachMessage: async (context) => { + // do somethings + }, + }); + } + + async onModuleDestroy(): Promise { + await this.producer.disconnect(); + await this.consumer.disconnect(); + } +} +``` diff --git a/src/index.ts b/src/index.ts index 8a6bd48..9da7506 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,3 +1,4 @@ +export { Partitioners } from 'kafkajs' export * from './decorator' export * from './interface' export * from './kafka-consumer.service' diff --git a/src/interface/kafka-module-for-producer.interface.ts b/src/interface/kafka-module-for-producer.interface.ts new file mode 100644 index 0000000..288aef6 --- /dev/null +++ b/src/interface/kafka-module-for-producer.interface.ts @@ -0,0 +1,8 @@ +import { ProducerConfig } from 'kafkajs' + +export interface KafkaModuleForProducerOptions extends ProducerConfig { + /** + * @default "default" + */ + name?: string +} diff --git a/src/kafka-producer.service.ts b/src/kafka-producer.service.ts index 552ff26..ead16e0 100644 --- a/src/kafka-producer.service.ts +++ b/src/kafka-producer.service.ts @@ -1,14 +1,18 @@ import { OnModuleDestroy, OnModuleInit } from '@nestjs/common' -import { Producer } from 'kafkajs' +import { Producer, ProducerRecord, RecordMetadata } from 'kafkajs' +import { KafkaModuleForProducerOptions } from './interface/kafka-module-for-producer.interface.js' import { KafkaService } from './kafka.service' -export class KafkaProducerService implements OnModuleInit, OnModuleDestroy { +export class KafkaProducer implements OnModuleInit, OnModuleDestroy { producer!: Producer - constructor(private readonly kafka: KafkaService) {} + constructor( + private readonly options: KafkaModuleForProducerOptions, + private readonly kafka: KafkaService + ) {} async onModuleInit(): Promise { - this.producer = this.kafka.producer() + this.producer = this.kafka.producer(this.options) await this.producer.connect() } @@ -16,13 +20,7 @@ export class KafkaProducerService implements OnModuleInit, OnModuleDestroy { await this.producer.disconnect() } - async send(topic: string, message: string): Promise { - const producer = this.kafka.producer() - await producer.connect() - await producer.send({ - topic, - messages: [{ value: message }], - }) - await producer.disconnect() + send(record: ProducerRecord): Promise { + return this.producer.send(record) } } diff --git a/src/kafka.module.ts b/src/kafka.module.ts index 7bc0bf7..07179e1 100644 --- a/src/kafka.module.ts +++ b/src/kafka.module.ts @@ -4,9 +4,11 @@ import { KafkaModuleOptions, KafkaModuleOptionsAsync, } from './interface' +import { KafkaModuleForProducerOptions } from './interface/kafka-module-for-producer.interface.js' import { KafkaConsumerService } from './kafka-consumer.service' -import { KafkaProducerService } from './kafka-producer.service' +import { KafkaProducer } from './kafka-producer.service' import { KafkaService } from './kafka.service' +import { getForProducerOptionsProvideName } from './utils/get-for-producer-options-provide-name.js' import { getKafkaConsumerServiceProvideName } from './utils/get-kafka-consumer-service-provide-name' import { getKafkaProducerServiceProvideName } from './utils/get-kafka-producer-service-provide-name' import { getKafkaServiceProvideName } from './utils/get-kafka-service-provide-name' @@ -19,7 +21,6 @@ export class KafkaModule { private static getProviders(name?: string): FactoryProvider[] { const optionsProvideName = getOptionsProvideName(name) const kafkaServiceProvideName = getKafkaServiceProvideName(name) - const kafkaProducerServiceProvideName = getKafkaProducerServiceProvideName(name) const kafkaConsumerServiceProvideName = getKafkaConsumerServiceProvideName(name) return [ @@ -28,11 +29,6 @@ export class KafkaModule { inject: [optionsProvideName], useFactory: (opts) => new KafkaService(opts), }, - { - provide: kafkaProducerServiceProvideName, - inject: [kafkaServiceProvideName], - useFactory: (kafkaService) => new KafkaProducerService(kafkaService), - }, { provide: kafkaConsumerServiceProvideName, inject: [ @@ -89,4 +85,27 @@ export class KafkaModule { exports: providers.map((item) => item.provide), } } + + static forProducer(options?: KafkaModuleForProducerOptions): DynamicModule { + const kafkaForProducerOptionsProvideName = getForProducerOptionsProvideName(options?.name) + const kafkaServiceProvideName = getKafkaServiceProvideName(options?.name) + const kafkaProducerServiceProvideName = getKafkaProducerServiceProvideName(options?.name) + + return { + global: true, + module: KafkaModule, + providers: [ + { + provide: kafkaForProducerOptionsProvideName, + useValue: options, + }, + { + provide: kafkaProducerServiceProvideName, + inject: [kafkaForProducerOptionsProvideName, kafkaServiceProvideName], + useFactory: (options, kafkaService) => new KafkaProducer(options, kafkaService), + }, + ], + exports: [kafkaProducerServiceProvideName], + } + } } diff --git a/src/kafka.service.ts b/src/kafka.service.ts index 2542476..bd6eeca 100644 --- a/src/kafka.service.ts +++ b/src/kafka.service.ts @@ -1,5 +1,5 @@ import { Injectable, Logger } from '@nestjs/common' -import { Consumer, ConsumerConfig, Kafka, Producer, logLevel } from 'kafkajs' +import { Consumer, ConsumerConfig, Kafka, Producer, ProducerConfig, logLevel } from 'kafkajs' import { KafkaModuleOptions } from './interface/kafka-module-options.interface' @Injectable() @@ -24,8 +24,8 @@ export class KafkaService { }) } - producer(): Producer { - return this.kafka.producer() + producer(config?: ProducerConfig): Producer { + return this.kafka.producer(config) } consumer(config: ConsumerConfig): Consumer { diff --git a/src/utils/get-for-producer-options-provide-name.ts b/src/utils/get-for-producer-options-provide-name.ts new file mode 100644 index 0000000..243c335 --- /dev/null +++ b/src/utils/get-for-producer-options-provide-name.ts @@ -0,0 +1,3 @@ +export function getForProducerOptionsProvideName(name?: string): string { + return `KafkaForProducerOptions.${name || 'default'}` +}