Skip to content

Commit

Permalink
feat: add KafkaProducer
Browse files Browse the repository at this point in the history
  • Loading branch information
Val-istar-Guo committed Jan 23, 2024
1 parent f785fc6 commit 36caac2
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 25 deletions.
103 changes: 100 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ That support multiple connections and fits the coding style of nestjs.

## Usage

Import `KafkaModule`:
Import `KafkaModule.forRoot` to `AppModule`:

```typescript
// app.module.js
Expand All @@ -33,7 +33,9 @@ import { KafkaModule } from "@buka/nestjs-kafka";
export class AppModule {}
```

Add a provider named `AppConsumer` that consume message:
### KafkaConsumer

Create a provider named `AppConsumer` that consume messages:

```typescript
// app.consumer.js
Expand All @@ -56,7 +58,9 @@ export class AppConsumer {
}
```

Append `AppConsumer` to `AppModule`:
> `AppConsumer` and `AppService` can be merged into one provider, but writing them separately will make the code clearer.
Then, append `AppConsumer` to `AppModule`:

```typescript
import { Module } from "@nestjs/common";
Expand All @@ -70,3 +74,96 @@ import { AppConsumer } from "./app.consumer";
})
export class AppModule {}
```

### KafkaProducer

`KafkaProducer` will connect on module init and disconnect on module destroy.
To use this, import `KafkaModule.forProducer(options)` to `AppModule`:

```typescript
// app.module.js
import { Module } from "@nestjs/common";
import { KafkaModule, Partitioners } from "@buka/nestjs-kafka";
import AppService from "./app.service";

@Module({
imports: [
KafkaModule.forRoot({
name: "my-kafka",
groupId: "my-group-id",
clientId: "my-client-id",
brokers: ["my_kafka_host:9092"],
}),
KafkaModule.forProducer({
name: "my-kafka",
createPartitioner: Partitioners.LegacyPartitioner,
}),
],
provider: [AppService],
})
export class AppModule {}
```

> The `options` of `.forProducer` is exactly the same as [the `options` of `kafka.producer` in KafkaJS](https://kafka.js.org/docs/producing)
Inject `KafkaProducer` to your `AppService`:

```typescript
// app.service.js
@Injectable()
export class AppService {
constructor(
@InjectKafkaProducer('my-kafka')
private readonly producer: KafkaProducer
) {}

async sendMessage() {
this.producer.send({
topic: 'kafka-topic'
messages: [{ value: 'Hello Kafka' }]
})
}
}
```

The `.send` function of `KafkaProducer` is exactly the same as [the `.send` function of KafkaJS](https://kafka.js.org/docs/producing#producing-messages)

### KafkaService

Using the `KafkaService`, you can create `consumer` and `producer` like plain KafkaJS.

```typescript
// app.service.js
import { OnModuleDestroy, OnModuleInit } from "@nestjs/common";
import { Producer, ProducerRecord, RecordMetadata } from "kafkajs";
import { KafkaService } from "@buka/nestjs-kafka";

@Injectable()
export class AppService implements OnModuleInit, OnModuleDestroy {
producer!: Producer;
consumer!: Consumer;

constructor(private readonly kafka: KafkaService) {}

async onModuleInit(): Promise<void> {
this.producer = this.kafka.producer();
await this.producer.connect();

this.consumer = this.kafka.consumer({
groupId: "my-group-id",
});

this.consumer.subscribe({ topic: "kafka-topic" });
this.consumer.run({
eachMessage: async (context) => {
// do somethings
},
});
}

async onModuleDestroy(): Promise<void> {
await this.producer.disconnect();
await this.consumer.disconnect();
}
}
```
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export { Partitioners } from 'kafkajs'
export * from './decorator'
export * from './interface'
export * from './kafka-consumer.service'
Expand Down
8 changes: 8 additions & 0 deletions src/interface/kafka-module-for-producer.interface.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import { ProducerConfig } from 'kafkajs'

export interface KafkaModuleForProducerOptions extends ProducerConfig {
/**
* @default "default"
*/
name?: string
}
22 changes: 10 additions & 12 deletions src/kafka-producer.service.ts
Original file line number Diff line number Diff line change
@@ -1,28 +1,26 @@
import { OnModuleDestroy, OnModuleInit } from '@nestjs/common'
import { Producer } from 'kafkajs'
import { Producer, ProducerRecord, RecordMetadata } from 'kafkajs'
import { KafkaModuleForProducerOptions } from './interface/kafka-module-for-producer.interface.js'
import { KafkaService } from './kafka.service'


export class KafkaProducerService implements OnModuleInit, OnModuleDestroy {
export class KafkaProducer implements OnModuleInit, OnModuleDestroy {
producer!: Producer
constructor(private readonly kafka: KafkaService) {}
constructor(
private readonly options: KafkaModuleForProducerOptions,
private readonly kafka: KafkaService
) {}

async onModuleInit(): Promise<void> {
this.producer = this.kafka.producer()
this.producer = this.kafka.producer(this.options)
await this.producer.connect()
}

async onModuleDestroy(): Promise<void> {
await this.producer.disconnect()
}

async send(topic: string, message: string): Promise<any> {
const producer = this.kafka.producer()
await producer.connect()
await producer.send({
topic,
messages: [{ value: message }],
})
await producer.disconnect()
send(record: ProducerRecord): Promise<RecordMetadata[]> {
return this.producer.send(record)
}
}
33 changes: 26 additions & 7 deletions src/kafka.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import {
KafkaModuleOptions,
KafkaModuleOptionsAsync,
} from './interface'
import { KafkaModuleForProducerOptions } from './interface/kafka-module-for-producer.interface.js'
import { KafkaConsumerService } from './kafka-consumer.service'
import { KafkaProducerService } from './kafka-producer.service'
import { KafkaProducer } from './kafka-producer.service'
import { KafkaService } from './kafka.service'
import { getForProducerOptionsProvideName } from './utils/get-for-producer-options-provide-name.js'
import { getKafkaConsumerServiceProvideName } from './utils/get-kafka-consumer-service-provide-name'
import { getKafkaProducerServiceProvideName } from './utils/get-kafka-producer-service-provide-name'
import { getKafkaServiceProvideName } from './utils/get-kafka-service-provide-name'
Expand All @@ -19,7 +21,6 @@ export class KafkaModule {
private static getProviders(name?: string): FactoryProvider[] {
const optionsProvideName = getOptionsProvideName(name)
const kafkaServiceProvideName = getKafkaServiceProvideName(name)
const kafkaProducerServiceProvideName = getKafkaProducerServiceProvideName(name)
const kafkaConsumerServiceProvideName = getKafkaConsumerServiceProvideName(name)

return [
Expand All @@ -28,11 +29,6 @@ export class KafkaModule {
inject: [optionsProvideName],
useFactory: (opts) => new KafkaService(opts),
},
{
provide: kafkaProducerServiceProvideName,
inject: [kafkaServiceProvideName],
useFactory: (kafkaService) => new KafkaProducerService(kafkaService),
},
{
provide: kafkaConsumerServiceProvideName,
inject: [
Expand Down Expand Up @@ -89,4 +85,27 @@ export class KafkaModule {
exports: providers.map((item) => item.provide),
}
}

static forProducer(options?: KafkaModuleForProducerOptions): DynamicModule {
const kafkaForProducerOptionsProvideName = getForProducerOptionsProvideName(options?.name)
const kafkaServiceProvideName = getKafkaServiceProvideName(options?.name)
const kafkaProducerServiceProvideName = getKafkaProducerServiceProvideName(options?.name)

return {
global: true,
module: KafkaModule,
providers: [
{
provide: kafkaForProducerOptionsProvideName,
useValue: options,
},
{
provide: kafkaProducerServiceProvideName,
inject: [kafkaForProducerOptionsProvideName, kafkaServiceProvideName],
useFactory: (options, kafkaService) => new KafkaProducer(options, kafkaService),
},
],
exports: [kafkaProducerServiceProvideName],
}
}
}
6 changes: 3 additions & 3 deletions src/kafka.service.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Injectable, Logger } from '@nestjs/common'
import { Consumer, ConsumerConfig, Kafka, Producer, logLevel } from 'kafkajs'
import { Consumer, ConsumerConfig, Kafka, Producer, ProducerConfig, logLevel } from 'kafkajs'
import { KafkaModuleOptions } from './interface/kafka-module-options.interface'

@Injectable()
Expand All @@ -24,8 +24,8 @@ export class KafkaService {
})
}

producer(): Producer {
return this.kafka.producer()
producer(config?: ProducerConfig): Producer {
return this.kafka.producer(config)
}

consumer(config: ConsumerConfig): Consumer {
Expand Down
3 changes: 3 additions & 0 deletions src/utils/get-for-producer-options-provide-name.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export function getForProducerOptionsProvideName(name?: string): string {
return `KafkaForProducerOptions.${name || 'default'}`
}

0 comments on commit 36caac2

Please sign in to comment.