-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathindex.ts
executable file
·134 lines (117 loc) · 4.97 KB
/
index.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
#!/usr/bin/env node
import * as commanderProgram from "commander";
import debug from "debug";
import { existsSync, readFileSync } from "fs";
import { resolve } from "path";
import * as pino from "pino";
import { AnonKafkaMirror, mapMessage } from "./lib/AnonKafkaMirror";
import config from "./lib/config/default";
let localConfig = config;
const debugLogger = debug("anon-kafka-mirror:cli");
commanderProgram
.option("-b, --consumer-broker-list [string]",
"The broker list string to consumer in the form HOST1:PORT1,HOST2:PORT2.")
.option("-p, --producer-broker-list [string]",
"The broker list string to produce in the form HOST1:PORT1,HOST2:PORT2.")
.option("-t, --consumer-topic [string]", "Kafka topic to consume.")
.option("-n, --producer-topic [string]", "Kafka topic to produce.")
.option("-g, --consumer-group [string]", "Kafka consumer group.")
.option("-c, --config-file [string]", "Anon kafka config file.")
.option("-f, --topic-config-file [string]", "Anon kafka topic config file.")
.option("-l, --level [string]", "Log level (debug,info,warn,error)")
.option("-d, --dry-run", "Just read message from stdin, convert, print output and exit.")
.parse(process.argv);
if (commanderProgram.configFile && existsSync(commanderProgram.configFile)) {
debugLogger("Got config file", commanderProgram.configFile);
try {
// tslint:disable-next-line
localConfig = require(resolve(commanderProgram.configFile)) as any;
} catch (e) {
console.error("Could not read config file", e);
}
}
if (commanderProgram.topicConfigFile && existsSync(commanderProgram.topicConfigFile)) {
debugLogger("Got topic config file", commanderProgram.topicConfigFile);
try {
const data = readFileSync(resolve(commanderProgram.topicConfigFile));
localConfig.topic = JSON.parse(data.toString());
} catch (e) {
console.error("Could not read config file", e);
}
}
debugLogger("Loaded topic config file", localConfig.topic);
if (!localConfig.topic ||
!localConfig.consumer || !localConfig.consumer.noptions ||
!localConfig.producer || !localConfig.producer.noptions) {
console.error("Config file does not contains topic, consumer or producer configurations.");
console.error("Config:", localConfig);
commanderProgram.help();
}
if (commanderProgram.consumerGroup) {
debugLogger("Rewrite consumer group from arg", commanderProgram.consumerGroup);
localConfig.consumer.noptions["group.id"] = commanderProgram.consumerGroup;
localConfig.producer.noptions["group.id"] = commanderProgram.consumerGroup;
}
if (commanderProgram.consumerBrokerList) {
debugLogger("Rewrite consumer broker list from arg", commanderProgram.consumerBrokerList);
localConfig.consumer.noptions["metadata.broker.list"] = commanderProgram.consumerBrokerList;
}
if (!localConfig.consumer.noptions["metadata.broker.list"]) {
console.error("Consumer broker list is required.");
commanderProgram.help();
}
if (commanderProgram.producerBrokerList) {
debugLogger("Rewrite producer broker list from arg", commanderProgram.producerBrokerList);
localConfig.producer.noptions["metadata.broker.list"] = commanderProgram.producerBrokerList;
}
if (!localConfig.producer.noptions["metadata.broker.list"]) {
console.error("Producer broker list is required.");
commanderProgram.help();
}
if (commanderProgram.consumerTopic) {
debugLogger("Rewrite consumer topic name from arg", commanderProgram.consumerTopic);
localConfig.topic.name = commanderProgram.consumerTopic;
}
if (!localConfig.topic.name) {
console.error("Topic name is required.");
commanderProgram.help();
}
if (commanderProgram.producerTopic) {
debugLogger("Rewrite producer topic name from arg", commanderProgram.producerTopic);
localConfig.topic.newName = commanderProgram.producerTopic;
}
if (localConfig.logger) {
if (commanderProgram.loglevel) {
localConfig.logger.level = commanderProgram.loglevel;
}
const logger = pino(localConfig.logger);
localConfig.consumer.logger = logger.child({ stream: "consumer" });
localConfig.producer.logger = logger.child({ stream: "producer" });
}
const mirror = new AnonKafkaMirror(localConfig);
if (commanderProgram.dryRun) {
const stdin = process.stdin;
const stdout = process.stdout;
const inputChunks = [];
stdin.resume();
stdin.setEncoding("utf8");
stdin.on("data", (chunk) => {
inputChunks.push(chunk);
});
stdin.on("end", () => {
try {
const inputJSON = inputChunks.join();
const parsedData = JSON.parse(inputJSON);
const mappedJSON = mapMessage(localConfig.topic, parsedData);
const outputJSON = JSON.stringify(mappedJSON, null, " ");
stdout.write(outputJSON);
stdout.write("\n");
} catch (e) {
console.log("Could not map message: ", e);
}
process.exit(0);
});
} else {
debugLogger("Start mirror");
mirror.run();
}