Skip to content

Commit

Permalink
fix:kafka re-initialized
Browse files Browse the repository at this point in the history
  • Loading branch information
Rithick574 committed Aug 29, 2024
1 parent 1292420 commit c425356
Show file tree
Hide file tree
Showing 12 changed files with 107 additions and 114 deletions.
4 changes: 2 additions & 2 deletions auth-service/src/infrastructure/kafka/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ const kafka = new Kafka({
ssl: true,
sasl: {
mechanism: 'plain',
username: 'AH3AIXDBMRITWY2S',
password:'PTTwXBptxZjyOa3DLtmSIjgC3mg8AZG8o1MB0pShQvbNX7bTC07O8HcgLAi+sqUj',
username: 'IGI4TMEEZDD5XDZG',
password:'KYygx3UkksOGC9+Iur1t5EPU3MlyQfY2qBgJ1zHfxW3leYtYefDoikTYcR8EjsPk',
},
connectionTimeout: 30000,
authenticationTimeout: 30000,
Expand Down
17 changes: 6 additions & 11 deletions chat-service/src/infrastructure/kafka/consumers/createGroupChat.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
import { createChat } from "../../database/mongoDB/repositories"
import { createChat } from "../../database/mongoDB/repositories";


export default async(
data:any
)=>{
try {
await createChat(data)
} catch (error) {

}
}
export default async (data: any) => {
try {
await createChat(data);
} catch (error) {}
};
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
import { createChat } from "../../database/mongoDB/repositories";


export default async(
data:any
)=>{
try {
await createChat(data)
} catch (error) {
console.log(error);

}
}

export default async (data: any) => {
try {
await createChat(data);
} catch (error) {
console.log(error);
}
};
35 changes: 18 additions & 17 deletions chat-service/src/infrastructure/kafka/index.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,26 @@
import { Consumer, Kafka, Partitioners, Producer } from "kafkajs"
import { Consumer, Kafka, Partitioners, Producer } from "kafkajs";

const kafka = new Kafka({
clientId: 'chat-service',
brokers: ['pkc-4j8dq.southeastasia.azure.confluent.cloud:9092'],
ssl: true,
sasl: {
mechanism: 'plain',
username: "AH3AIXDBMRITWY2S",
password: 'PTTwXBptxZjyOa3DLtmSIjgC3mg8AZG8o1MB0pShQvbNX7bTC07O8HcgLAi+sqUj',
},
connectionTimeout: 30000,
authenticationTimeout: 30000,
});
clientId: "chat-service",
brokers: ["pkc-4j8dq.southeastasia.azure.confluent.cloud:9092"],
ssl: true,
sasl: {
mechanism: "plain",
username: "IGI4TMEEZDD5XDZG",
password: "KYygx3UkksOGC9+Iur1t5EPU3MlyQfY2qBgJ1zHfxW3leYtYefDoikTYcR8EjsPk",
},
connectionTimeout: 30000,
authenticationTimeout: 30000,
});

// export const kafka =new Kafka({
// clientId:'chat-service',
// brokers:['34.93.145.38:29092']
// })

export const producer:Producer = kafka.producer({ createPartitioner: Partitioners.LegacyPartitioner });
export const consumer:Consumer=kafka.consumer({
groupId:'chat-service-kafka-group'
})

export const producer: Producer = kafka.producer({
createPartitioner: Partitioners.LegacyPartitioner,
});
export const consumer: Consumer = kafka.consumer({
groupId: "chat-service-kafka-group",
});
1 change: 0 additions & 1 deletion chat-service/src/infrastructure/socket/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import { Server as IOServer } from "socket.io";
import { messageSeen, setLastSeen,saveNotification } from "../database/mongoDB/repositories";

const connectSocketIo = (server: Server) => {
console.log("🚀 ~ file: index.ts:7 ~ connectSocketIo ~ server:", server)
const io = new IOServer(server, {
cors: {
origin: ["https://learnwise-client.vercel.app"],
Expand Down
4 changes: 2 additions & 2 deletions course-service/src/infrastructure/kafka/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ const kafka = new Kafka({
ssl: true,
sasl: {
mechanism: "plain",
username: "AH3AIXDBMRITWY2S",
username: "IGI4TMEEZDD5XDZG",
password:
"PTTwXBptxZjyOa3DLtmSIjgC3mg8AZG8o1MB0pShQvbNX7bTC07O8HcgLAi+sqUj",
"KYygx3UkksOGC9+Iur1t5EPU3MlyQfY2qBgJ1zHfxW3leYtYefDoikTYcR8EjsPk",
},
connectionTimeout: 30000,
authenticationTimeout: 30000,
Expand Down
105 changes: 57 additions & 48 deletions notification-service/src/__boot/consumer.ts
Original file line number Diff line number Diff line change
@@ -1,56 +1,65 @@
import { consumer } from "../infrastructure/kafka";
import { INotificationSubscriber, createSubscriber } from "../infrastructure/kafka";
import {
INotificationSubscriber,
createSubscriber,
} from "../infrastructure/kafka";

export const runConsumer = async () => {
try {
await consumer.connect();
await consumer.subscribe({ topic: "notification-service-topic", fromBeginning: true });

const subscriber = createSubscriber();

await consumer.run({
eachMessage: async ({ message }) => {
const { key, value } = message;

if (!key) {
console.error("Message key is missing.");
return;
}
if (!value) {
console.error("Message value is missing.");
return;
}

const subscriberMethod = convertKeyToMethodName(String(key));;

if (typeof subscriber[subscriberMethod] !== 'function') {
console.error(`Method ${subscriberMethod} is not defined on subscriber.`);
return;
}

try {
const subscriberData = JSON.parse(String(value));
await subscriber[subscriberMethod](subscriberData);
} catch (error: any) {
console.error(`Error processing message with key ${key}: ${error.message}`);
}
}
});

} catch (error: any) {
console.error("Kafka Consume Error -> Notification : ", error.message);
}
}
try {
await consumer.connect();
await consumer.subscribe({
topic: "notification-service-topic",
fromBeginning: true,
});

const subscriber = createSubscriber();

await consumer.run({
eachMessage: async ({ message }) => {
const { key, value } = message;

if (!key) {
console.error("Message key is missing.");
return;
}
if (!value) {
console.error("Message value is missing.");
return;
}

const subscriberMethod = convertKeyToMethodName(String(key));

if (typeof subscriber[subscriberMethod] !== "function") {
console.error(
`Method ${subscriberMethod} is not defined on subscriber.`
);
return;
}

try {
const subscriberData = JSON.parse(String(value));
await subscriber[subscriberMethod](subscriberData);
} catch (error: any) {
console.error(
`Error processing message with key ${key}: ${error.message}`
);
}
},
});
} catch (error: any) {
console.error("Kafka Consume Error -> Notification : ", error.message);
}
};

function convertKeyToMethodName(key: string): keyof INotificationSubscriber {
const keyMap: { [key: string]: keyof INotificationSubscriber } = {
USER_CREATED_MESSAGE: 'userCreated',
REQUEST_FORGOT_PASSWORD_MESSAGE: 'requestForgotPassword'
};
return keyMap[key] || key;
const keyMap: { [key: string]: keyof INotificationSubscriber } = {
USER_CREATED_MESSAGE: "userCreated",
REQUEST_FORGOT_PASSWORD_MESSAGE: "requestForgotPassword",
};
return keyMap[key] || key;
}

export const stopConsumer = async () => {
await consumer.stop();
await consumer.disconnect();
}
await consumer.stop();
await consumer.disconnect();
};
Original file line number Diff line number Diff line change
@@ -1,15 +1,9 @@
import { sendVerificationMail } from "../../../infrastructure/services";

export default async (
data:string
) => {

try {

await sendVerificationMail(data);

} catch (error: any) {
console.log("user-created-consumed mail send error: ", error?.message);
}

}
export default async (data: string) => {
try {
await sendVerificationMail(data);
} catch (error: any) {
console.log("user-created-consumed mail send error: ", error?.message);
}
};
4 changes: 2 additions & 2 deletions notification-service/src/infrastructure/kafka/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ const kafka = new Kafka({
ssl: true,
sasl: {
mechanism: 'plain',
username: 'AH3AIXDBMRITWY2S',
username: 'IGI4TMEEZDD5XDZG',
password:
'PTTwXBptxZjyOa3DLtmSIjgC3mg8AZG8o1MB0pShQvbNX7bTC07O8HcgLAi+sqUj',
'KYygx3UkksOGC9+Iur1t5EPU3MlyQfY2qBgJ1zHfxW3leYtYefDoikTYcR8EjsPk',
},
connectionTimeout: 30000,
authenticationTimeout: 30000,
Expand Down
4 changes: 2 additions & 2 deletions payment-service/src/infrastructure/messages/kafka/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ const kafka = new Kafka({
ssl: true,
sasl: {
mechanism: "plain",
username: "AH3AIXDBMRITWY2S",
username: "IGI4TMEEZDD5XDZG",
password:
"PTTwXBptxZjyOa3DLtmSIjgC3mg8AZG8o1MB0pShQvbNX7bTC07O8HcgLAi+sqUj",
"KYygx3UkksOGC9+Iur1t5EPU3MlyQfY2qBgJ1zHfxW3leYtYefDoikTYcR8EjsPk",
},
connectionTimeout: 30000,
authenticationTimeout: 30000,
Expand Down
4 changes: 2 additions & 2 deletions user-service/src/kafka/consumer/consumer.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ export class ConsumerService implements OnModuleInit {
ssl: true,
sasl: {
mechanism: 'plain',
username: 'AH3AIXDBMRITWY2S',
username: 'IGI4TMEEZDD5XDZG',
password:
'PTTwXBptxZjyOa3DLtmSIjgC3mg8AZG8o1MB0pShQvbNX7bTC07O8HcgLAi+sqUj',
'KYygx3UkksOGC9+Iur1t5EPU3MlyQfY2qBgJ1zHfxW3leYtYefDoikTYcR8EjsPk',
},
connectionTimeout: 30000,
authenticationTimeout: 30000,
Expand Down
4 changes: 2 additions & 2 deletions user-service/src/kafka/producer/producer.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ export class ProducerService implements OnModuleInit, OnApplicationShutdown {
ssl: true,
sasl: {
mechanism: 'plain',
username: 'AH3AIXDBMRITWY2S',
username: 'IGI4TMEEZDD5XDZG',
password:
'PTTwXBptxZjyOa3DLtmSIjgC3mg8AZG8o1MB0pShQvbNX7bTC07O8HcgLAi+sqUj',
'KYygx3UkksOGC9+Iur1t5EPU3MlyQfY2qBgJ1zHfxW3leYtYefDoikTYcR8EjsPk',
},
connectionTimeout: 30000,
authenticationTimeout: 30000,
Expand Down

0 comments on commit c425356

Please sign in to comment.