Skip to content

Commit

Permalink
kafka changed to confluent
Browse files Browse the repository at this point in the history
  • Loading branch information
Rithick574 committed Jul 16, 2024
1 parent 4771758 commit 1292420
Show file tree
Hide file tree
Showing 7 changed files with 116 additions and 103 deletions.
31 changes: 17 additions & 14 deletions auth-service/src/infrastructure/kafka/index.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
import { Kafka, Producer, Partitioners, Consumer } from "kafkajs";

// const kafka = new Kafka({
// clientId: 'auth-service',
// brokers: ['pkc-4j8dq.southeastasia.azure.confluent.cloud:9092'],
// ssl: true,
// sasl: {
// mechanism: 'plain',
// username: 'AH3AIXDBMRITWY2S',
// password:'PTTwXBptxZjyOa3DLtmSIjgC3mg8AZG8o1MB0pShQvbNX7bTC07O8HcgLAi+sqUj',
// },
// });

export const kafka = new Kafka({
const kafka = new Kafka({
clientId: 'auth-service',
brokers: ["34.93.145.38:29092"]
})
brokers: ['pkc-4j8dq.southeastasia.azure.confluent.cloud:9092'],
ssl: true,
sasl: {
mechanism: 'plain',
username: 'AH3AIXDBMRITWY2S',
password:'PTTwXBptxZjyOa3DLtmSIjgC3mg8AZG8o1MB0pShQvbNX7bTC07O8HcgLAi+sqUj',
},
connectionTimeout: 30000,
authenticationTimeout: 30000,

});

// export const kafka = new Kafka({
// clientId: 'auth-service',
// brokers: ["34.93.145.38:29092"]
// })

export const producer: Producer = kafka.producer({
createPartitioner: Partitioners.LegacyPartitioner,
Expand Down
31 changes: 16 additions & 15 deletions chat-service/src/infrastructure/kafka/index.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
import { Consumer, Kafka, Partitioners, Producer } from "kafkajs"

// const kafka = new Kafka({
// clientId: 'chat-service',
// brokers: ['pkc-4j8dq.southeastasia.azure.confluent.cloud:9092'],
// ssl: true,
// sasl: {
// mechanism: 'plain',
// username: "AH3AIXDBMRITWY2S",
// password: 'PTTwXBptxZjyOa3DLtmSIjgC3mg8AZG8o1MB0pShQvbNX7bTC07O8HcgLAi+sqUj',
// },
// authenticationTimeout: 45000
// });
const kafka = new Kafka({
clientId: 'chat-service',
brokers: ['pkc-4j8dq.southeastasia.azure.confluent.cloud:9092'],
ssl: true,
sasl: {
mechanism: 'plain',
username: "AH3AIXDBMRITWY2S",
password: 'PTTwXBptxZjyOa3DLtmSIjgC3mg8AZG8o1MB0pShQvbNX7bTC07O8HcgLAi+sqUj',
},
connectionTimeout: 30000,
authenticationTimeout: 30000,
});

export const kafka =new Kafka({
clientId:'chat-service',
brokers:['34.93.145.38:29092']
})
// export const kafka =new Kafka({
// clientId:'chat-service',
// brokers:['34.93.145.38:29092']
// })

export const producer:Producer = kafka.producer({ createPartitioner: Partitioners.LegacyPartitioner });
export const consumer:Consumer=kafka.consumer({
Expand Down
30 changes: 16 additions & 14 deletions course-service/src/infrastructure/kafka/index.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,25 @@
import { Kafka, Producer, Consumer } from "kafkajs";

// const kafka = new Kafka({
// clientId: "course-service",
// brokers: ["pkc-4j8dq.southeastasia.azure.confluent.cloud:9092"],
// ssl: true,
// sasl: {
// mechanism: "plain",
// username: "AH3AIXDBMRITWY2S",
// password:
// "PTTwXBptxZjyOa3DLtmSIjgC3mg8AZG8o1MB0pShQvbNX7bTC07O8HcgLAi+sqUj",
// },
// });


const kafka = new Kafka({
clientId: "course-service",
brokers: ["34.93.145.38:29092"],
brokers: ["pkc-4j8dq.southeastasia.azure.confluent.cloud:9092"],
ssl: true,
sasl: {
mechanism: "plain",
username: "AH3AIXDBMRITWY2S",
password:
"PTTwXBptxZjyOa3DLtmSIjgC3mg8AZG8o1MB0pShQvbNX7bTC07O8HcgLAi+sqUj",
},
connectionTimeout: 30000,
authenticationTimeout: 30000,
});


// const kafka = new Kafka({
// clientId: "course-service",
// brokers: ["34.93.145.38:29092"],
// });

export const producer: Producer = kafka.producer();
export const consumer: Consumer = kafka.consumer({
groupId: "course-service-kafka-group",
Expand Down
33 changes: 17 additions & 16 deletions notification-service/src/infrastructure/kafka/index.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,25 @@
import { Kafka,Producer,Consumer,Partitioners } from "kafkajs";


// const kafka = new Kafka({
// clientId: 'cart-service',
// brokers: ['pkc-4j8dq.southeastasia.azure.confluent.cloud:9092'],
// ssl: true,
// sasl: {
// mechanism: 'plain',
// username: 'AH3AIXDBMRITWY2S',
// password:
// 'PTTwXBptxZjyOa3DLtmSIjgC3mg8AZG8o1MB0pShQvbNX7bTC07O8HcgLAi+sqUj',
// },
// authenticationTimeout: 45000
// });
const kafka = new Kafka({
clientId: 'cart-service',
brokers: ['pkc-4j8dq.southeastasia.azure.confluent.cloud:9092'],
ssl: true,
sasl: {
mechanism: 'plain',
username: 'AH3AIXDBMRITWY2S',
password:
'PTTwXBptxZjyOa3DLtmSIjgC3mg8AZG8o1MB0pShQvbNX7bTC07O8HcgLAi+sqUj',
},
connectionTimeout: 30000,
authenticationTimeout: 30000,
});


export const kafka = new Kafka({
clientId: 'cart-service',
brokers: ["34.93.145.38:29092"]
})
// export const kafka = new Kafka({
// clientId: 'cart-service',
// brokers: ["34.93.145.38:29092"]
// })

export const producer:Producer = kafka.producer({ createPartitioner: Partitioners.LegacyPartitioner });
export const consumer:Consumer = kafka.consumer({
Expand Down
28 changes: 15 additions & 13 deletions payment-service/src/infrastructure/messages/kafka/index.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
import { Kafka, Producer,Partitioners,Consumer } from "kafkajs";

// const kafka = new Kafka({
// clientId: "payment-service",
// brokers: ["pkc-4j8dq.southeastasia.azure.confluent.cloud:9092"],
// ssl: true,
// sasl: {
// mechanism: "plain",
// username: "AH3AIXDBMRITWY2S",
// password:
// "PTTwXBptxZjyOa3DLtmSIjgC3mg8AZG8o1MB0pShQvbNX7bTC07O8HcgLAi+sqUj",
// },
// });

const kafka = new Kafka({
clientId: "payment-service",
brokers: ["34.93.145.38:29092"],
brokers: ["pkc-4j8dq.southeastasia.azure.confluent.cloud:9092"],
ssl: true,
sasl: {
mechanism: "plain",
username: "AH3AIXDBMRITWY2S",
password:
"PTTwXBptxZjyOa3DLtmSIjgC3mg8AZG8o1MB0pShQvbNX7bTC07O8HcgLAi+sqUj",
},
connectionTimeout: 30000,
authenticationTimeout: 30000,
});

// const kafka = new Kafka({
// clientId: "payment-service",
// brokers: ["34.93.145.38:29092"],
// });

export const producer: Producer = kafka.producer({ createPartitioner: Partitioners.LegacyPartitioner });
export const consumer: Consumer = kafka.consumer({
groupId: "payment-service-kafka-group",
Expand Down
34 changes: 18 additions & 16 deletions user-service/src/kafka/consumer/consumer.service.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,27 @@
import { Injectable, OnModuleInit } from '@nestjs/common';
import { Kafka, Consumer, EachMessagePayload } from 'kafkajs';
import { UsersService } from 'src/users/users.service';
import { UsersService } from 'src/users/users.service';

@Injectable()
export class ConsumerService implements OnModuleInit {
private readonly kafka = new Kafka({
brokers: ['34.93.145.38:29092'],
});

// private readonly kafka = new Kafka({
// clientId: "user-service",
// brokers: ["pkc-4j8dq.southeastasia.azure.confluent.cloud:9092"],
// ssl: true,
// sasl: {
// mechanism: "plain",
// username: "AH3AIXDBMRITWY2S",
// password:
// "PTTwXBptxZjyOa3DLtmSIjgC3mg8AZG8o1MB0pShQvbNX7bTC07O8HcgLAi+sqUj",
// },
// brokers: ['34.93.145.38:29092'],
// });

private readonly kafka = new Kafka({
clientId: 'user-service',
brokers: ['pkc-4j8dq.southeastasia.azure.confluent.cloud:9092'],
ssl: true,
sasl: {
mechanism: 'plain',
username: 'AH3AIXDBMRITWY2S',
password:
'PTTwXBptxZjyOa3DLtmSIjgC3mg8AZG8o1MB0pShQvbNX7bTC07O8HcgLAi+sqUj',
},
connectionTimeout: 30000,
authenticationTimeout: 30000,
});

private readonly consumer: Consumer;

constructor(private readonly usersService: UsersService) {
Expand All @@ -36,13 +38,13 @@ export class ConsumerService implements OnModuleInit {
if (messageValue) {
try {
const userData = JSON.parse(messageValue);
console.log('Parsed userData:', userData);
console.log('Parsed userData:', userData);
await this.usersService.addUser(userData);
} catch (error) {
console.error('Error processing message', error);
}
}
}
},
});
}

Expand Down
32 changes: 17 additions & 15 deletions user-service/src/kafka/producer/producer.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,25 @@ import { Kafka, Producer, ProducerRecord } from 'kafkajs';

@Injectable()
export class ProducerService implements OnModuleInit, OnApplicationShutdown {
private readonly kafka = new Kafka({
brokers: ['34.93.145.38:29092'],
});
// private readonly kafka = new Kafka({
// brokers: ['localhost:29092'],
// brokers: ['34.93.145.38:29092'],
// });
// private readonly kafka = new Kafka({
// clientId: "user-service",
// brokers: ["pkc-4j8dq.southeastasia.azure.confluent.cloud:9092"],
// ssl: true,
// sasl: {
// mechanism: "plain",
// username: "AH3AIXDBMRITWY2S",
// password:
// "PTTwXBptxZjyOa3DLtmSIjgC3mg8AZG8o1MB0pShQvbNX7bTC07O8HcgLAi+sqUj",
// },
// brokers: ['localhost:29092'],
// });
private readonly kafka = new Kafka({
clientId: 'user-service',
brokers: ['pkc-4j8dq.southeastasia.azure.confluent.cloud:9092'],
ssl: true,
sasl: {
mechanism: 'plain',
username: 'AH3AIXDBMRITWY2S',
password:
'PTTwXBptxZjyOa3DLtmSIjgC3mg8AZG8o1MB0pShQvbNX7bTC07O8HcgLAi+sqUj',
},
connectionTimeout: 30000,
authenticationTimeout: 30000,
});

private readonly producer: Producer = this.kafka.producer();
private readonly logger = new Logger(ProducerService.name);
Expand All @@ -48,13 +50,13 @@ export class ProducerService implements OnModuleInit, OnApplicationShutdown {
}
}

async produce(record:ProducerRecord) {
async produce(record: ProducerRecord) {
try {
await this.producer.send(record);
this.logger.log(`Message sent to ${record.topic}`);
} catch (error) {
this.logger.error(`Failed to send message to ${record.topic}:`, error);
throw error;
throw error;
}
}
}

0 comments on commit 1292420

Please sign in to comment.