From c4253563bee47d2fac250e57a3139e5b723f85a7 Mon Sep 17 00:00:00 2001 From: Rithick Date: Thu, 29 Aug 2024 12:13:58 +0530 Subject: [PATCH] fix:kafka re-initialized --- .../src/infrastructure/kafka/index.ts | 4 +- .../kafka/consumers/createGroupChat.ts | 17 +-- .../individualChatCreationConsumer.ts | 19 ++-- .../src/infrastructure/kafka/index.ts | 35 +++--- .../src/infrastructure/socket/index.ts | 1 - .../src/infrastructure/kafka/index.ts | 4 +- notification-service/src/__boot/consumer.ts | 105 ++++++++++-------- .../kafka/consumers/userCreatedConsumer.ts | 20 ++-- .../src/infrastructure/kafka/index.ts | 4 +- .../infrastructure/messages/kafka/index.ts | 4 +- .../src/kafka/consumer/consumer.service.ts | 4 +- .../src/kafka/producer/producer.service.ts | 4 +- 12 files changed, 107 insertions(+), 114 deletions(-) diff --git a/auth-service/src/infrastructure/kafka/index.ts b/auth-service/src/infrastructure/kafka/index.ts index e73b6af..d7846c4 100644 --- a/auth-service/src/infrastructure/kafka/index.ts +++ b/auth-service/src/infrastructure/kafka/index.ts @@ -6,8 +6,8 @@ const kafka = new Kafka({ ssl: true, sasl: { mechanism: 'plain', - username: 'AH3AIXDBMRITWY2S', - password:'PTTwXBptxZjyOa3DLtmSIjgC3mg8AZG8o1MB0pShQvbNX7bTC07O8HcgLAi+sqUj', + username: 'IGI4TMEEZDD5XDZG', + password:'KYygx3UkksOGC9+Iur1t5EPU3MlyQfY2qBgJ1zHfxW3leYtYefDoikTYcR8EjsPk', }, connectionTimeout: 30000, authenticationTimeout: 30000, diff --git a/chat-service/src/infrastructure/kafka/consumers/createGroupChat.ts b/chat-service/src/infrastructure/kafka/consumers/createGroupChat.ts index 7f8154e..fc5fe72 100644 --- a/chat-service/src/infrastructure/kafka/consumers/createGroupChat.ts +++ b/chat-service/src/infrastructure/kafka/consumers/createGroupChat.ts @@ -1,12 +1,7 @@ -import { createChat } from "../../database/mongoDB/repositories" +import { createChat } from "../../database/mongoDB/repositories"; - -export default async( - data:any -)=>{ - try { - await createChat(data) - } catch (error) { - - } -} \ No newline at end of file +export default async (data: any) => { + try { + await createChat(data); + } catch (error) {} +}; diff --git a/chat-service/src/infrastructure/kafka/consumers/individualChatCreationConsumer.ts b/chat-service/src/infrastructure/kafka/consumers/individualChatCreationConsumer.ts index dc9dee8..c5ceb2c 100644 --- a/chat-service/src/infrastructure/kafka/consumers/individualChatCreationConsumer.ts +++ b/chat-service/src/infrastructure/kafka/consumers/individualChatCreationConsumer.ts @@ -1,14 +1,9 @@ import { createChat } from "../../database/mongoDB/repositories"; - -export default async( - data:any -)=>{ - try { - await createChat(data) - } catch (error) { - console.log(error); - - } -} - +export default async (data: any) => { + try { + await createChat(data); + } catch (error) { + console.log(error); + } +}; diff --git a/chat-service/src/infrastructure/kafka/index.ts b/chat-service/src/infrastructure/kafka/index.ts index 2d88ee7..2574a6d 100644 --- a/chat-service/src/infrastructure/kafka/index.ts +++ b/chat-service/src/infrastructure/kafka/index.ts @@ -1,25 +1,26 @@ -import { Consumer, Kafka, Partitioners, Producer } from "kafkajs" +import { Consumer, Kafka, Partitioners, Producer } from "kafkajs"; const kafka = new Kafka({ - clientId: 'chat-service', - brokers: ['pkc-4j8dq.southeastasia.azure.confluent.cloud:9092'], - ssl: true, - sasl: { - mechanism: 'plain', - username: "AH3AIXDBMRITWY2S", - password: 'PTTwXBptxZjyOa3DLtmSIjgC3mg8AZG8o1MB0pShQvbNX7bTC07O8HcgLAi+sqUj', - }, - connectionTimeout: 30000, - authenticationTimeout: 30000, - }); + clientId: "chat-service", + brokers: ["pkc-4j8dq.southeastasia.azure.confluent.cloud:9092"], + ssl: true, + sasl: { + mechanism: "plain", + username: "IGI4TMEEZDD5XDZG", + password: "KYygx3UkksOGC9+Iur1t5EPU3MlyQfY2qBgJ1zHfxW3leYtYefDoikTYcR8EjsPk", + }, + connectionTimeout: 30000, + authenticationTimeout: 30000, +}); // export const kafka =new Kafka({ // clientId:'chat-service', // brokers:['34.93.145.38:29092'] // }) -export const producer:Producer = kafka.producer({ createPartitioner: Partitioners.LegacyPartitioner }); -export const consumer:Consumer=kafka.consumer({ - groupId:'chat-service-kafka-group' -}) - +export const producer: Producer = kafka.producer({ + createPartitioner: Partitioners.LegacyPartitioner, +}); +export const consumer: Consumer = kafka.consumer({ + groupId: "chat-service-kafka-group", +}); diff --git a/chat-service/src/infrastructure/socket/index.ts b/chat-service/src/infrastructure/socket/index.ts index ac3bfc3..48fcf6d 100644 --- a/chat-service/src/infrastructure/socket/index.ts +++ b/chat-service/src/infrastructure/socket/index.ts @@ -4,7 +4,6 @@ import { Server as IOServer } from "socket.io"; import { messageSeen, setLastSeen,saveNotification } from "../database/mongoDB/repositories"; const connectSocketIo = (server: Server) => { - console.log("🚀 ~ file: index.ts:7 ~ connectSocketIo ~ server:", server) const io = new IOServer(server, { cors: { origin: ["https://learnwise-client.vercel.app"], diff --git a/course-service/src/infrastructure/kafka/index.ts b/course-service/src/infrastructure/kafka/index.ts index d799fe5..dc20415 100644 --- a/course-service/src/infrastructure/kafka/index.ts +++ b/course-service/src/infrastructure/kafka/index.ts @@ -6,9 +6,9 @@ const kafka = new Kafka({ ssl: true, sasl: { mechanism: "plain", - username: "AH3AIXDBMRITWY2S", + username: "IGI4TMEEZDD5XDZG", password: - "PTTwXBptxZjyOa3DLtmSIjgC3mg8AZG8o1MB0pShQvbNX7bTC07O8HcgLAi+sqUj", + "KYygx3UkksOGC9+Iur1t5EPU3MlyQfY2qBgJ1zHfxW3leYtYefDoikTYcR8EjsPk", }, connectionTimeout: 30000, authenticationTimeout: 30000, diff --git a/notification-service/src/__boot/consumer.ts b/notification-service/src/__boot/consumer.ts index 805df0f..aca67bd 100644 --- a/notification-service/src/__boot/consumer.ts +++ b/notification-service/src/__boot/consumer.ts @@ -1,56 +1,65 @@ import { consumer } from "../infrastructure/kafka"; -import { INotificationSubscriber, createSubscriber } from "../infrastructure/kafka"; +import { + INotificationSubscriber, + createSubscriber, +} from "../infrastructure/kafka"; export const runConsumer = async () => { - try { - await consumer.connect(); - await consumer.subscribe({ topic: "notification-service-topic", fromBeginning: true }); - - const subscriber = createSubscriber(); - - await consumer.run({ - eachMessage: async ({ message }) => { - const { key, value } = message; - - if (!key) { - console.error("Message key is missing."); - return; - } - if (!value) { - console.error("Message value is missing."); - return; - } - - const subscriberMethod = convertKeyToMethodName(String(key));; - - if (typeof subscriber[subscriberMethod] !== 'function') { - console.error(`Method ${subscriberMethod} is not defined on subscriber.`); - return; - } - - try { - const subscriberData = JSON.parse(String(value)); - await subscriber[subscriberMethod](subscriberData); - } catch (error: any) { - console.error(`Error processing message with key ${key}: ${error.message}`); - } - } - }); - - } catch (error: any) { - console.error("Kafka Consume Error -> Notification : ", error.message); - } -} + try { + await consumer.connect(); + await consumer.subscribe({ + topic: "notification-service-topic", + fromBeginning: true, + }); + + const subscriber = createSubscriber(); + + await consumer.run({ + eachMessage: async ({ message }) => { + const { key, value } = message; + + if (!key) { + console.error("Message key is missing."); + return; + } + if (!value) { + console.error("Message value is missing."); + return; + } + + const subscriberMethod = convertKeyToMethodName(String(key)); + + if (typeof subscriber[subscriberMethod] !== "function") { + console.error( + `Method ${subscriberMethod} is not defined on subscriber.` + ); + return; + } + + try { + const subscriberData = JSON.parse(String(value)); + await subscriber[subscriberMethod](subscriberData); + } catch (error: any) { + console.error( + `Error processing message with key ${key}: ${error.message}` + ); + } + }, + }); + } catch (error: any) { + console.error("Kafka Consume Error -> Notification : ", error.message); + } +}; function convertKeyToMethodName(key: string): keyof INotificationSubscriber { - const keyMap: { [key: string]: keyof INotificationSubscriber } = { - USER_CREATED_MESSAGE: 'userCreated', - REQUEST_FORGOT_PASSWORD_MESSAGE: 'requestForgotPassword' - }; - return keyMap[key] || key; + const keyMap: { [key: string]: keyof INotificationSubscriber } = { + USER_CREATED_MESSAGE: "userCreated", + REQUEST_FORGOT_PASSWORD_MESSAGE: "requestForgotPassword", + }; + return keyMap[key] || key; } export const stopConsumer = async () => { - await consumer.stop(); - await consumer.disconnect(); -} + await consumer.stop(); + await consumer.disconnect(); +}; diff --git a/notification-service/src/infrastructure/kafka/consumers/userCreatedConsumer.ts b/notification-service/src/infrastructure/kafka/consumers/userCreatedConsumer.ts index 3849c9d..cdd92be 100644 --- a/notification-service/src/infrastructure/kafka/consumers/userCreatedConsumer.ts +++ b/notification-service/src/infrastructure/kafka/consumers/userCreatedConsumer.ts @@ -1,15 +1,9 @@ import { sendVerificationMail } from "../../../infrastructure/services"; -export default async ( - data:string -) => { - - try { - - await sendVerificationMail(data); - - } catch (error: any) { - console.log("user-created-consumed mail send error: ", error?.message); - } - -} \ No newline at end of file +export default async (data: string) => { + try { + await sendVerificationMail(data); + } catch (error: any) { + console.log("user-created-consumed mail send error: ", error?.message); + } +}; diff --git a/notification-service/src/infrastructure/kafka/index.ts b/notification-service/src/infrastructure/kafka/index.ts index a079af7..816fa51 100644 --- a/notification-service/src/infrastructure/kafka/index.ts +++ b/notification-service/src/infrastructure/kafka/index.ts @@ -7,9 +7,9 @@ const kafka = new Kafka({ ssl: true, sasl: { mechanism: 'plain', - username: 'AH3AIXDBMRITWY2S', + username: 'IGI4TMEEZDD5XDZG', password: - 'PTTwXBptxZjyOa3DLtmSIjgC3mg8AZG8o1MB0pShQvbNX7bTC07O8HcgLAi+sqUj', + 'KYygx3UkksOGC9+Iur1t5EPU3MlyQfY2qBgJ1zHfxW3leYtYefDoikTYcR8EjsPk', }, connectionTimeout: 30000, authenticationTimeout: 30000, diff --git a/payment-service/src/infrastructure/messages/kafka/index.ts b/payment-service/src/infrastructure/messages/kafka/index.ts index 3cf0180..7ea454a 100644 --- a/payment-service/src/infrastructure/messages/kafka/index.ts +++ b/payment-service/src/infrastructure/messages/kafka/index.ts @@ -6,9 +6,9 @@ const kafka = new Kafka({ ssl: true, sasl: { mechanism: "plain", - username: "AH3AIXDBMRITWY2S", + username: "IGI4TMEEZDD5XDZG", password: - "PTTwXBptxZjyOa3DLtmSIjgC3mg8AZG8o1MB0pShQvbNX7bTC07O8HcgLAi+sqUj", + "KYygx3UkksOGC9+Iur1t5EPU3MlyQfY2qBgJ1zHfxW3leYtYefDoikTYcR8EjsPk", }, connectionTimeout: 30000, authenticationTimeout: 30000, diff --git a/user-service/src/kafka/consumer/consumer.service.ts b/user-service/src/kafka/consumer/consumer.service.ts index 02b5fe5..5eb1792 100644 --- a/user-service/src/kafka/consumer/consumer.service.ts +++ b/user-service/src/kafka/consumer/consumer.service.ts @@ -14,9 +14,9 @@ export class ConsumerService implements OnModuleInit { ssl: true, sasl: { mechanism: 'plain', - username: 'AH3AIXDBMRITWY2S', + username: 'IGI4TMEEZDD5XDZG', password: - 'PTTwXBptxZjyOa3DLtmSIjgC3mg8AZG8o1MB0pShQvbNX7bTC07O8HcgLAi+sqUj', + 'KYygx3UkksOGC9+Iur1t5EPU3MlyQfY2qBgJ1zHfxW3leYtYefDoikTYcR8EjsPk', }, connectionTimeout: 30000, authenticationTimeout: 30000, diff --git a/user-service/src/kafka/producer/producer.service.ts b/user-service/src/kafka/producer/producer.service.ts index 1ece33a..260f4d7 100644 --- a/user-service/src/kafka/producer/producer.service.ts +++ b/user-service/src/kafka/producer/producer.service.ts @@ -20,9 +20,9 @@ export class ProducerService implements OnModuleInit, OnApplicationShutdown { ssl: true, sasl: { mechanism: 'plain', - username: 'AH3AIXDBMRITWY2S', + username: 'IGI4TMEEZDD5XDZG', password: - 'PTTwXBptxZjyOa3DLtmSIjgC3mg8AZG8o1MB0pShQvbNX7bTC07O8HcgLAi+sqUj', + 'KYygx3UkksOGC9+Iur1t5EPU3MlyQfY2qBgJ1zHfxW3leYtYefDoikTYcR8EjsPk', }, connectionTimeout: 30000, authenticationTimeout: 30000,