Skip to content

Commit b2caafa

Browse files
authored
rotor: Add support for custom CA certificate in Kafka SSL configuration (#1175)
1 parent 4088245 commit b2caafa

File tree

1 file changed

+33
-10
lines changed

1 file changed

+33
-10
lines changed

services/rotor/src/lib/kafka-config.ts

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@ import { Kafka, logLevel, CompressionCodecs, CompressionTypes } from "kafkajs";
22
import SnappyCodec from "kafkajs-snappy";
33
import "@sensejs/kafkajs-zstd-support";
44

5-
import { LogMessageBuilder, requireDefined, randomId, getLog } from "juava";
5+
import { readFileSync } from "fs";
6+
import { isTruish, LogMessageBuilder, requireDefined, randomId, getLog } from "juava";
67
import JSON5 from "json5";
78
const log = getLog("kafka");
89

@@ -25,7 +26,7 @@ function translateLevel(l: logLevel): LogMessageBuilder {
2526

2627
export type KafkaCredentials = {
2728
brokers: string[] | string;
28-
ssl?: boolean;
29+
ssl?: boolean | Record<string, any>;
2930
sasl?: {
3031
mechanism: "scram-sha-256" | "scram-sha-512";
3132
username: string;
@@ -34,9 +35,37 @@ export type KafkaCredentials = {
3435
};
3536

3637
export function getCredentialsFromEnv(): KafkaCredentials {
38+
const ssl = isTruish(process.env.KAFKA_SSL);
39+
const sslSkipVerify = isTruish(process.env.KAFKA_SSL_SKIP_VERIFY);
40+
41+
let sslOption: KafkaCredentials["ssl"] = undefined;
42+
43+
if (ssl) {
44+
if (sslSkipVerify) {
45+
// TLS enabled, but server TLS certificate is not verified
46+
sslOption = {
47+
rejectUnauthorized: false,
48+
checkServerIdentity: () => undefined,
49+
};
50+
} else if (process.env.KAFKA_SSL_CA) {
51+
// TLS enabled, server TLS certificate is verified using a custom CA certificate
52+
sslOption = {
53+
ca: process.env.KAFKA_SSL_CA,
54+
};
55+
} else if (process.env.KAFKA_SSL_CA_FILE) {
56+
// TLS enabled, server TLS certificate is verified using a custom CA certificate (loaded from a local file)
57+
sslOption = {
58+
ca: readFileSync(process.env.KAFKA_SSL_CA_FILE, "utf-8"),
59+
};
60+
} else {
61+
// TLS enabled, no extra configurations
62+
sslOption = true;
63+
}
64+
}
65+
3766
return {
3867
brokers: requireDefined(process.env.KAFKA_BOOTSTRAP_SERVERS, "env KAFKA_BOOTSTRAP_SERVERS is required").split(","),
39-
ssl: process.env.KAFKA_SSL === "true" || process.env.KAFKA_SSL === "1",
68+
ssl: sslOption,
4069
sasl: process.env.KAFKA_SASL ? JSON5.parse(process.env.KAFKA_SASL) : undefined,
4170
};
4271
}
@@ -57,13 +86,7 @@ export function connectToKafka(opts: { defaultAppId: string } & KafkaCredentials
5786
// },
5887
clientId: process.env.APPLICATION_ID || opts.defaultAppId,
5988
brokers: typeof opts.brokers === "string" ? opts.brokers.split(",") : opts.brokers,
60-
ssl: opts.ssl
61-
? {
62-
rejectUnauthorized: false,
63-
checkServerIdentity: () => undefined,
64-
}
65-
: undefined,
66-
89+
ssl: opts.ssl,
6790
...sasl,
6891
});
6992
}

0 commit comments

Comments
 (0)