Skip to content

Commit

Permalink
[CMDCT-3956] Kafka + Rates (#2470)
Browse files Browse the repository at this point in the history
Co-authored-by: ailZhou <aileen.zhou@coforma.io>
  • Loading branch information
BearHanded and ailZhou authored Oct 25, 2024
1 parent f49204b commit 9c67301
Show file tree
Hide file tree
Showing 18 changed files with 1,606 additions and 174 deletions.
12 changes: 4 additions & 8 deletions serverless-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,27 @@ services:
# wave 1: no dependencies
database:
path: services/database
topics:
path: services/topics
ui:
path: services/ui

# wave 2: depends on database
uploads:
path: services/uploads
params:
CoreSetTableName: ${database.CoreSetTableName}
MeasureTableName: ${database.MeasureTableName}
MeasureTable: ${database.MeasureTable}
CoreSetTable: ${database.CoreSetTable}
RateTable: ${database.RateTable}
RateTableName: ${database.RateTableName}

app-api:
path: services/app-api
params:
CoreSetTableName: ${database.CoreSetTableName}
CoreSetTable: ${database.CoreSetTable}
CoreSetTableStreamArn: ${database.CoreSetTableStreamArn}
MeasureTableName: ${database.MeasureTableName}
MeasureTable: ${database.MeasureTable}
CoreSetTable: ${database.CoreSetTable}
RateTable: ${database.RateTable}
MeasureTableStreamArn: ${database.MeasureTableStreamArn}
RateTableName: ${database.RateTableName}
RateTable: ${database.RateTable}
RateTableStreamArn: ${database.RateTableStreamArn}
BannerTableName: ${database.BannerTableName}

Expand Down
10 changes: 0 additions & 10 deletions services/app-api/handlers/kafka/post/postKafkaData.js

This file was deleted.

13 changes: 13 additions & 0 deletions services/app-api/handlers/kafka/post/postKafkaData.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import KafkaSourceLib from "../../../libs/kafka-source-lib";

const version = "v0";
const topicPrefix = "aws.mdct.qmr.cdc";
const tables = [
{ sourceName: process.env.coreSetTable!, topicName: "coreSet" },
{ sourceName: process.env.measureTable!, topicName: "measure" },
{ sourceName: process.env.rateTable!, topicName: "rate" },
];

const postKafkaData = new KafkaSourceLib(topicPrefix, version, tables);

exports.handler = postKafkaData.handler.bind(postKafkaData);
129 changes: 0 additions & 129 deletions services/app-api/libs/kafka-source-lib.js

This file was deleted.

177 changes: 177 additions & 0 deletions services/app-api/libs/kafka-source-lib.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
/* eslint-disable no-console */
import { unmarshall } from "@aws-sdk/util-dynamodb";
import { Kafka, Producer } from "kafkajs";

type KafkaPayload = {
key: string;
value: string;
partition: number;
headers: {
eventName: string;
eventTime?: string;
eventID?: string;
};
};
type SourceTopicMapping = {
sourceName: string;
topicName: string;
};

let kafka: Kafka;
let producer: Producer;

class KafkaSourceLib {
/*
* Event types:
* cmd – command; restful publish
* cdc – change data capture; record upsert/delete in data store
* sys – system event; send email, archive logs
* fct – fact; user activity, notifications, logs
*
* topicPrefix = "[data_center].[system_of_record].[business_domain].[event_type]";
* version = "some version";
* tables = [list of table mappings];
*/

topicPrefix: string;
version: string | null;
tables: SourceTopicMapping[];
connected: boolean;
topicNamespace: string;
stage: string;
constructor(
topicPrefix: string,
version: string | null,
tables: SourceTopicMapping[]
) {
if (!process.env.BOOTSTRAP_BROKER_STRING_TLS) {
throw new Error("Missing Broker Config. ");
}
// Setup vars
this.stage = process.env.STAGE ? process.env.STAGE : "";
this.topicNamespace = process.env.topicNamespace
? process.env.topicNamespace
: "";
this.topicPrefix = topicPrefix;
this.version = version;
this.tables = tables;

const brokerStrings = process.env.BOOTSTRAP_BROKER_STRING_TLS;
kafka = new Kafka({
clientId: `qmr-${this.stage}`,
brokers: brokerStrings!.split(","),
retry: {
initialRetryTime: 300,
retries: 8,
},
ssl: {
rejectUnauthorized: false,
},
});

// Attach Events
producer = kafka.producer();
this.connected = false;
const signalTraps = ["SIGTERM", "SIGINT", "SIGUSR2", "beforeExit"];
signalTraps.map((type) => {
process.removeListener(type, producer.disconnect);
});
signalTraps.map((type) => {
process.once(type, producer.disconnect);
});
}

stringify(e: any, prettyPrint?: boolean) {
if (prettyPrint === true) return JSON.stringify(e, null, 2);
return JSON.stringify(e);
}

/**
* Checks if a streamArn is a valid topic. Returns undefined otherwise
* @param streamARN - DynamoDB streamARN
* @returns
*/
determineDynamoTopicName(streamARN: string) {
for (const table of this.tables) {
if (streamARN.includes(`/${table.sourceName}/`))
return this.topic(table.topicName);
}
console.log(`Topic not found for table arn: ${streamARN}`);
}

unmarshall(r: any) {
if (!r) return {};
return unmarshall(r);
}

createDynamoPayload(record: any): KafkaPayload {
const dynamodb = record.dynamodb;
const { eventID, eventName } = record;
const dynamoRecord = {
NewImage: this.unmarshall(dynamodb.NewImage),
OldImage: this.unmarshall(dynamodb.OldImage),
Keys: this.unmarshall(dynamodb.Keys),
};
return {
key: Object.values(dynamoRecord.Keys).join("#"),
value: this.stringify(dynamoRecord),
partition: 0,
headers: { eventID: eventID, eventName: eventName },
};
}

topic(t: string) {
if (this.version) {
return `${this.topicNamespace}${this.topicPrefix}.${t}.${this.version}`;
} else {
return `${this.topicNamespace}${this.topicPrefix}.${t}`;
}
}

async createOutboundEvents(records: any[]) {
let outboundEvents: { [key: string]: any } = {};
for (const record of records) {
let payload, topicName;
topicName = this.determineDynamoTopicName(
String(record.eventSourceARN.toString())
);
if (!topicName) continue;
payload = this.createDynamoPayload(record);

//initialize configuration object keyed to topic for quick lookup
if (!(outboundEvents[topicName] instanceof Object))
outboundEvents[topicName] = {
topic: topicName,
messages: [],
};

//add messages to messages array for corresponding topic
outboundEvents[topicName].messages.push(payload);
}
return outboundEvents;
}

async handler(event: any) {
if (!this.connected) {
await producer.connect();
this.connected = true;
}

// Warmup events have no records.
if (!event.Records) {
console.log("No records to process. Exiting.");
return;
}

// if dynamo
const outboundEvents = await this.createOutboundEvents(event.Records);

const topicMessages = Object.values(outboundEvents);
console.log(`Batch configuration: ${this.stringify(topicMessages, true)}`);

if (topicMessages.length > 0) await producer.sendBatch({ topicMessages });
console.log(`Successfully processed ${event.Records.length} records.`);
}
}

export default KafkaSourceLib;
Loading

0 comments on commit 9c67301

Please sign in to comment.