diff --git a/auth-service/src/infrastructure/kafka/index.ts b/auth-service/src/infrastructure/kafka/index.ts index 74625b0..e73b6af 100644 --- a/auth-service/src/infrastructure/kafka/index.ts +++ b/auth-service/src/infrastructure/kafka/index.ts @@ -1,20 +1,23 @@ import { Kafka, Producer, Partitioners, Consumer } from "kafkajs"; -// const kafka = new Kafka({ -// clientId: 'auth-service', -// brokers: ['pkc-4j8dq.southeastasia.azure.confluent.cloud:9092'], -// ssl: true, -// sasl: { -// mechanism: 'plain', -// username: 'AH3AIXDBMRITWY2S', -// password:'PTTwXBptxZjyOa3DLtmSIjgC3mg8AZG8o1MB0pShQvbNX7bTC07O8HcgLAi+sqUj', -// }, -// }); - -export const kafka = new Kafka({ +const kafka = new Kafka({ clientId: 'auth-service', - brokers: ["34.93.145.38:29092"] -}) + brokers: ['pkc-4j8dq.southeastasia.azure.confluent.cloud:9092'], + ssl: true, + sasl: { + mechanism: 'plain', + username: 'AH3AIXDBMRITWY2S', + password:'PTTwXBptxZjyOa3DLtmSIjgC3mg8AZG8o1MB0pShQvbNX7bTC07O8HcgLAi+sqUj', + }, + connectionTimeout: 30000, + authenticationTimeout: 30000, + +}); + +// export const kafka = new Kafka({ +// clientId: 'auth-service', +// brokers: ["34.93.145.38:29092"] +// }) export const producer: Producer = kafka.producer({ createPartitioner: Partitioners.LegacyPartitioner, diff --git a/chat-service/src/infrastructure/kafka/index.ts b/chat-service/src/infrastructure/kafka/index.ts index 05db9a3..2d88ee7 100644 --- a/chat-service/src/infrastructure/kafka/index.ts +++ b/chat-service/src/infrastructure/kafka/index.ts @@ -1,21 +1,22 @@ import { Consumer, Kafka, Partitioners, Producer } from "kafkajs" -// const kafka = new Kafka({ -// clientId: 'chat-service', -// brokers: ['pkc-4j8dq.southeastasia.azure.confluent.cloud:9092'], -// ssl: true, -// sasl: { -// mechanism: 'plain', -// username: "AH3AIXDBMRITWY2S", -// password: 'PTTwXBptxZjyOa3DLtmSIjgC3mg8AZG8o1MB0pShQvbNX7bTC07O8HcgLAi+sqUj', -// }, -// authenticationTimeout: 45000 -// }); +const kafka = new Kafka({ + clientId: 'chat-service', + brokers: ['pkc-4j8dq.southeastasia.azure.confluent.cloud:9092'], + ssl: true, + sasl: { + mechanism: 'plain', + username: "AH3AIXDBMRITWY2S", + password: 'PTTwXBptxZjyOa3DLtmSIjgC3mg8AZG8o1MB0pShQvbNX7bTC07O8HcgLAi+sqUj', + }, + connectionTimeout: 30000, + authenticationTimeout: 30000, + }); -export const kafka =new Kafka({ - clientId:'chat-service', - brokers:['34.93.145.38:29092'] -}) +// export const kafka =new Kafka({ +// clientId:'chat-service', +// brokers:['34.93.145.38:29092'] +// }) export const producer:Producer = kafka.producer({ createPartitioner: Partitioners.LegacyPartitioner }); export const consumer:Consumer=kafka.consumer({ diff --git a/course-service/src/infrastructure/kafka/index.ts b/course-service/src/infrastructure/kafka/index.ts index b421798..d799fe5 100644 --- a/course-service/src/infrastructure/kafka/index.ts +++ b/course-service/src/infrastructure/kafka/index.ts @@ -1,23 +1,25 @@ import { Kafka, Producer, Consumer } from "kafkajs"; -// const kafka = new Kafka({ -// clientId: "course-service", -// brokers: ["pkc-4j8dq.southeastasia.azure.confluent.cloud:9092"], -// ssl: true, -// sasl: { -// mechanism: "plain", -// username: "AH3AIXDBMRITWY2S", -// password: -// "PTTwXBptxZjyOa3DLtmSIjgC3mg8AZG8o1MB0pShQvbNX7bTC07O8HcgLAi+sqUj", -// }, -// }); - - const kafka = new Kafka({ clientId: "course-service", - brokers: ["34.93.145.38:29092"], + brokers: ["pkc-4j8dq.southeastasia.azure.confluent.cloud:9092"], + ssl: true, + sasl: { + mechanism: "plain", + username: "AH3AIXDBMRITWY2S", + password: + "PTTwXBptxZjyOa3DLtmSIjgC3mg8AZG8o1MB0pShQvbNX7bTC07O8HcgLAi+sqUj", + }, + connectionTimeout: 30000, + authenticationTimeout: 30000, }); + +// const kafka = new Kafka({ +// clientId: "course-service", +// brokers: ["34.93.145.38:29092"], +// }); + export const producer: Producer = kafka.producer(); export const consumer: Consumer = kafka.consumer({ groupId: "course-service-kafka-group", diff --git a/notification-service/src/infrastructure/kafka/index.ts b/notification-service/src/infrastructure/kafka/index.ts index b8c4a8d..a079af7 100644 --- a/notification-service/src/infrastructure/kafka/index.ts +++ b/notification-service/src/infrastructure/kafka/index.ts @@ -1,24 +1,25 @@ import { Kafka,Producer,Consumer,Partitioners } from "kafkajs"; -// const kafka = new Kafka({ -// clientId: 'cart-service', -// brokers: ['pkc-4j8dq.southeastasia.azure.confluent.cloud:9092'], -// ssl: true, -// sasl: { -// mechanism: 'plain', -// username: 'AH3AIXDBMRITWY2S', -// password: -// 'PTTwXBptxZjyOa3DLtmSIjgC3mg8AZG8o1MB0pShQvbNX7bTC07O8HcgLAi+sqUj', -// }, -// authenticationTimeout: 45000 -// }); +const kafka = new Kafka({ + clientId: 'cart-service', + brokers: ['pkc-4j8dq.southeastasia.azure.confluent.cloud:9092'], + ssl: true, + sasl: { + mechanism: 'plain', + username: 'AH3AIXDBMRITWY2S', + password: + 'PTTwXBptxZjyOa3DLtmSIjgC3mg8AZG8o1MB0pShQvbNX7bTC07O8HcgLAi+sqUj', + }, + connectionTimeout: 30000, + authenticationTimeout: 30000, +}); -export const kafka = new Kafka({ - clientId: 'cart-service', - brokers: ["34.93.145.38:29092"] - }) +// export const kafka = new Kafka({ +// clientId: 'cart-service', +// brokers: ["34.93.145.38:29092"] +// }) export const producer:Producer = kafka.producer({ createPartitioner: Partitioners.LegacyPartitioner }); export const consumer:Consumer = kafka.consumer({ diff --git a/payment-service/src/infrastructure/messages/kafka/index.ts b/payment-service/src/infrastructure/messages/kafka/index.ts index 528e0fd..3cf0180 100644 --- a/payment-service/src/infrastructure/messages/kafka/index.ts +++ b/payment-service/src/infrastructure/messages/kafka/index.ts @@ -1,22 +1,24 @@ import { Kafka, Producer,Partitioners,Consumer } from "kafkajs"; -// const kafka = new Kafka({ -// clientId: "payment-service", -// brokers: ["pkc-4j8dq.southeastasia.azure.confluent.cloud:9092"], -// ssl: true, -// sasl: { -// mechanism: "plain", -// username: "AH3AIXDBMRITWY2S", -// password: -// "PTTwXBptxZjyOa3DLtmSIjgC3mg8AZG8o1MB0pShQvbNX7bTC07O8HcgLAi+sqUj", -// }, -// }); - const kafka = new Kafka({ clientId: "payment-service", - brokers: ["34.93.145.38:29092"], + brokers: ["pkc-4j8dq.southeastasia.azure.confluent.cloud:9092"], + ssl: true, + sasl: { + mechanism: "plain", + username: "AH3AIXDBMRITWY2S", + password: + "PTTwXBptxZjyOa3DLtmSIjgC3mg8AZG8o1MB0pShQvbNX7bTC07O8HcgLAi+sqUj", + }, + connectionTimeout: 30000, + authenticationTimeout: 30000, }); +// const kafka = new Kafka({ +// clientId: "payment-service", +// brokers: ["34.93.145.38:29092"], +// }); + export const producer: Producer = kafka.producer({ createPartitioner: Partitioners.LegacyPartitioner }); export const consumer: Consumer = kafka.consumer({ groupId: "payment-service-kafka-group", diff --git a/user-service/src/kafka/consumer/consumer.service.ts b/user-service/src/kafka/consumer/consumer.service.ts index 4e87806..02b5fe5 100644 --- a/user-service/src/kafka/consumer/consumer.service.ts +++ b/user-service/src/kafka/consumer/consumer.service.ts @@ -1,25 +1,27 @@ import { Injectable, OnModuleInit } from '@nestjs/common'; import { Kafka, Consumer, EachMessagePayload } from 'kafkajs'; -import { UsersService } from 'src/users/users.service'; +import { UsersService } from 'src/users/users.service'; @Injectable() export class ConsumerService implements OnModuleInit { - private readonly kafka = new Kafka({ - brokers: ['34.93.145.38:29092'], - }); - // private readonly kafka = new Kafka({ - // clientId: "user-service", - // brokers: ["pkc-4j8dq.southeastasia.azure.confluent.cloud:9092"], - // ssl: true, - // sasl: { - // mechanism: "plain", - // username: "AH3AIXDBMRITWY2S", - // password: - // "PTTwXBptxZjyOa3DLtmSIjgC3mg8AZG8o1MB0pShQvbNX7bTC07O8HcgLAi+sqUj", - // }, + // brokers: ['34.93.145.38:29092'], // }); + private readonly kafka = new Kafka({ + clientId: 'user-service', + brokers: ['pkc-4j8dq.southeastasia.azure.confluent.cloud:9092'], + ssl: true, + sasl: { + mechanism: 'plain', + username: 'AH3AIXDBMRITWY2S', + password: + 'PTTwXBptxZjyOa3DLtmSIjgC3mg8AZG8o1MB0pShQvbNX7bTC07O8HcgLAi+sqUj', + }, + connectionTimeout: 30000, + authenticationTimeout: 30000, + }); + private readonly consumer: Consumer; constructor(private readonly usersService: UsersService) { @@ -36,13 +38,13 @@ export class ConsumerService implements OnModuleInit { if (messageValue) { try { const userData = JSON.parse(messageValue); - console.log('Parsed userData:', userData); + console.log('Parsed userData:', userData); await this.usersService.addUser(userData); } catch (error) { console.error('Error processing message', error); } } - } + }, }); } diff --git a/user-service/src/kafka/producer/producer.service.ts b/user-service/src/kafka/producer/producer.service.ts index 2aa4c19..1ece33a 100644 --- a/user-service/src/kafka/producer/producer.service.ts +++ b/user-service/src/kafka/producer/producer.service.ts @@ -8,23 +8,25 @@ import { Kafka, Producer, ProducerRecord } from 'kafkajs'; @Injectable() export class ProducerService implements OnModuleInit, OnApplicationShutdown { - private readonly kafka = new Kafka({ - brokers: ['34.93.145.38:29092'], - }); // private readonly kafka = new Kafka({ - // brokers: ['localhost:29092'], + // brokers: ['34.93.145.38:29092'], // }); // private readonly kafka = new Kafka({ - // clientId: "user-service", - // brokers: ["pkc-4j8dq.southeastasia.azure.confluent.cloud:9092"], - // ssl: true, - // sasl: { - // mechanism: "plain", - // username: "AH3AIXDBMRITWY2S", - // password: - // "PTTwXBptxZjyOa3DLtmSIjgC3mg8AZG8o1MB0pShQvbNX7bTC07O8HcgLAi+sqUj", - // }, + // brokers: ['localhost:29092'], // }); + private readonly kafka = new Kafka({ + clientId: 'user-service', + brokers: ['pkc-4j8dq.southeastasia.azure.confluent.cloud:9092'], + ssl: true, + sasl: { + mechanism: 'plain', + username: 'AH3AIXDBMRITWY2S', + password: + 'PTTwXBptxZjyOa3DLtmSIjgC3mg8AZG8o1MB0pShQvbNX7bTC07O8HcgLAi+sqUj', + }, + connectionTimeout: 30000, + authenticationTimeout: 30000, + }); private readonly producer: Producer = this.kafka.producer(); private readonly logger = new Logger(ProducerService.name); @@ -48,13 +50,13 @@ export class ProducerService implements OnModuleInit, OnApplicationShutdown { } } - async produce(record:ProducerRecord) { + async produce(record: ProducerRecord) { try { await this.producer.send(record); this.logger.log(`Message sent to ${record.topic}`); } catch (error) { this.logger.error(`Failed to send message to ${record.topic}:`, error); - throw error; + throw error; } } }