Skip to content

Commit

Permalink
Merge pull request #255 from buildable/sandbox
Browse files Browse the repository at this point in the history
Sandbox -> Main
  • Loading branch information
paulkr authored May 17, 2023
2 parents d4c4b5a + 632efa7 commit 4a0e5c4
Show file tree
Hide file tree
Showing 40 changed files with 3,346 additions and 49 deletions.
2 changes: 1 addition & 1 deletion .eslintrc.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
"class-methods-use-this": ["off"],
"consistent-return": ["off"],
"newIsCap": ["off"],

"no-param-reassign": ["off"],
"import/prefer-default-export": ["off"],
"import/extensions": ["off"]
}
Expand Down
2 changes: 1 addition & 1 deletion catalog/destinations/bigquery/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"type": "bigquery",
"category": "database",
"image": "https://assets.buildable.dev/catalog/node-templates/bigquery.svg",
"tags": ["database", "db", "warehouse", "analytics", "olap"],
"tags": ["database", "warehouse"],
"authentication": [
{
"name": "GOOGLE_SERVICE_ACCOUNT_KEY",
Expand Down
12 changes: 12 additions & 0 deletions catalog/destinations/kafka/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# 📣 Change Log
All notable changes to the `Kafka` Destination Connection will be documented in this file.

The format followed is based on [Keep a Changelog](http://keepachangelog.com/) and this project adheres to [Semantic Versioning](http://semver.org/).

---

## [1.0.0] - 2023-04-18

⚡️ Initial Version

---
43 changes: 43 additions & 0 deletions catalog/destinations/kafka/config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
{
"title": "Kafka",
"description": "Kafka is an open-source streaming platform used for building real-time data pipelines and streaming applications.",
"apiVersion": "NA",
"type": "kafka",
"category": "streaming",
"image": "https://assets.buildable.dev/catalog/node-templates/kafka.svg",
"tags": ["streaming"],
"authentication": [
{
"name": "KAFKA_BROKER_URLS",
"label": "Enter your Kafka Broker Nodes URLs, separated by comma (,)",
"placeholder": "kafka-broker1.dev:9092,kafka-broker2.dev:9092,kafka-broker3.dev:9092"
},
{
"name": "KAFKA_USERNAME",
"label": "Enter your Kafka Username",
"placeholder": "myusername"
},
{
"name": "KAFKA_PASSWORD",
"label": "Enter your Kafka Password",
"placeholder": "mypassword"
},
{
"name": "KAFKA_CLIENT_ID",
"label": "Enter a Client ID for your connection",
"placeholder": "myclientid",
"value": "event"
}
],
"eventSchema": {},
"settings": {
"createBuildableEnvVars": true,
"hasEvents": false,
"showEvents": false
},
"paths": null,
"events": [],
"connectionTypes": ["target"],
"actions": ["pushData"],
"destinationType": "http"
}
23 changes: 23 additions & 0 deletions catalog/destinations/kafka/docs/connect.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
## Connect

Kafka is an open-source streaming platform used for building real-time data pipelines and streaming applications.

### IP Whitelisting

To enable Buildable to connect to your Kafka Server, please ensure that your firewall accepts incoming requests from the following IP addresses:

`35.245.232.82` `35.245.100.81`

`34.120.49.202` `34.160.218.232`

`3.12.101.201` `3.129.238.32`

`3.13.190.25` `3.133.23.83`

### Securely Encrypted

Rest assured, your credentials are securely encrypted to keep your information safe.

### Need Help?

Start a conversation in our [Discord Server](https://discord.com/invite/47AJ42Wzys) or send an email to [support@buildable.dev](mailto:https://discord.com/invite/47AJ42Wzys). Our Data Engineers are available 24/7 to help if you ever get stuck.
15 changes: 15 additions & 0 deletions catalog/destinations/kafka/docs/setup.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
## Kafka Destination Setup

Check out our [quick start guide](https://docs.buildable.dev/) to learn how to get started.

### IP Whitelisting

To enable Buildable to connect to your Kafka Server, please ensure that your firewall accepts incoming requests from the following IP addresses:

`35.245.232.82` `35.245.100.81`

`34.120.49.202` `34.160.218.232`

`3.12.101.201` `3.129.238.32`

`3.13.190.25` `3.133.23.83`
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
### Push Data

Allows to push data into a Kafka topic

[Documentation](https://kafka.apache.org/quickstart)

**Types**

```typescript
interface IKafkaPushData {
topic: string;
data: string | string[] | Buffer | Buffer[] | AnyObject | AnyObject[];
headers?: AnyObject;
partition?: number;
key?: string;
timestamp?: number | string;
}
```

**Sample Payload**
```json
{
"topic": "topic_0",
"data": [{
"id": 1,
"name": "John Doe"
}],
"headers": {
"header1": "value1",
"header2": "value2"
},
"partition": 3,
"key": "1"
}
```

**Sample Response**
```json
[
{
"topicName": "topic_0",
"partition": 3,
"errorCode": 0,
"baseOffset": "2",
"logAppendTime": "-1",
"logStartOffset": "0"
}
]
```
3 changes: 3 additions & 0 deletions catalog/destinations/kafka/docs/transformations/footer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
### Need Help?

Start a conversation in our [Discord Server](https://discord.com/invite/47AJ42Wzys) or send an email to [support@buildable.dev](mailto:https://discord.com/invite/47AJ42Wzys). Our Data Engineers are available 24/7 to help if you ever get stuck.
16 changes: 16 additions & 0 deletions catalog/destinations/kafka/docs/transformations/header.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
## Documentation
----

Kafka is an open-source streaming platform used for building real-time data pipelines and streaming applications

### IP Whitelisting

To enable Buildable to connect to your Kafka Server, please ensure that your firewall accepts incoming requests from the following IP addresses:

`35.245.232.82` `35.245.100.81`

`34.120.49.202` `34.160.218.232`

`3.12.101.201` `3.129.238.32`

`3.13.190.25` `3.133.23.83`
13 changes: 13 additions & 0 deletions catalog/destinations/kafka/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import getProxyDriver from "./kafka";

export async function main({ payload, config, action }) {
try {
const driver = getProxyDriver(config);

const data = await driver[action](payload);

return { data, status: 200 };
} catch (error) {
throw error;
}
}
177 changes: 177 additions & 0 deletions catalog/destinations/kafka/kafka.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
import crypto from "crypto";

import { Kafka, Producer } from "kafkajs";

import { AnyObject, DestinationClassI, TestConnection, Truthy } from "../../../types/destinationClassDefinition";
import { IKafkaPushData } from "./lib/types";

function generateRandomHexString(n: number): string {
const bytes = crypto.randomBytes(n / 2);
return bytes.toString("hex");
}

export class KafkaDriver implements DestinationClassI {
public client: Kafka = null;

public producer: Producer = null;

public readonly KAFKA_BROKER_URLS: string;

public readonly KAFKA_USERNAME: string;

public readonly KAFKA_PASSWORD: string;

public readonly KAFKA_CLIENT_ID: string | null = null;

constructor({ KAFKA_BROKER_URLS, KAFKA_USERNAME, KAFKA_PASSWORD, KAFKA_CLIENT_ID }: AnyObject) {
this.KAFKA_BROKER_URLS = KAFKA_BROKER_URLS;
this.KAFKA_USERNAME = KAFKA_USERNAME;
this.KAFKA_PASSWORD = KAFKA_PASSWORD;
this.KAFKA_CLIENT_ID = KAFKA_CLIENT_ID;
}

async connect(config?: AnyObject): Promise<void | Truthy> {
const { KAFKA_BROKER_URLS, KAFKA_USERNAME, KAFKA_PASSWORD } = config || this;

this.client = new Kafka({
clientId: this.KAFKA_CLIENT_ID || `event-${generateRandomHexString(8)}`,
brokers: KAFKA_BROKER_URLS.split(",").map((url) => url.trim()),
ssl: true,
sasl: {
mechanism: "plain",
username: KAFKA_USERNAME,
password: KAFKA_PASSWORD,
},
logLevel: 1,
connectionTimeout: 5000,
});

this.producer = this.client.producer();
await this.producer.connect();
}

async disconnect(): Promise<void | Truthy> {
if (this.producer) {
await this.producer.disconnect();
this.producer = null;
}

if (this.client) {
this.client = null;
}
}

async testConnection(): Promise<TestConnection> {
// If the client is not yet initialized, initialize it
if (!this.client) {
this.client = new Kafka({
clientId: this.KAFKA_CLIENT_ID || `event-${generateRandomHexString(8)}`,
brokers: this.KAFKA_BROKER_URLS.split(",").map((url) => url.trim()),
ssl: true,
sasl: {
mechanism: "plain",
username: this.KAFKA_USERNAME,
password: this.KAFKA_PASSWORD,
},
logLevel: 1,
connectionTimeout: 5000,
});
}

// Test the connection
try {
const admin = this.client.admin();
await admin.connect();
await admin.disconnect();

return {
success: true,
message: "Connection established successfully",
};
} catch (e) {
return {
success: false,
message: `Kafka connection failed: ${e.message}`,
};
}
}

/**
* Push data to Kafka
* @param topic - Kafka topic
* @param data - Data to push to Kafka
* @param headers - Headers to send with the message
* @param partition - Partition to send the message to
* @param key - Key to send with the message
* @param timestamp - Timestamp in UTC format
*/
async pushData({ topic, data, headers, partition, key, timestamp }: IKafkaPushData) {
timestamp = timestamp ? timestamp.toString() : undefined;

return this.producer.send({
topic,
messages: [{
value: JSON.stringify(data),
headers,
partition,
key,
timestamp,
}],
});
}
}

export default function getProxyDriver(config: AnyObject) {
const driver = new KafkaDriver(config);

return new Proxy(driver, {
get: (target, prop) => {
// return the client
if (prop === "client") {
return driver.client;
}

// return the producer
if (prop === "producer") {
return driver.producer;
}

if (typeof driver[prop] === "function") {
if (prop === "testConnection") {
return async () => driver.testConnection();
}

// Force the proxy to return a Promise that only resolves once the connection has been established
if (prop === "connect") {
return async () => {
await driver.connect(config);
};
}

// Force the proxy to return a Promise that only resolves once the connection has been dropped
if (prop === "disconnect") {
return async () => {
await driver.disconnect();
};
}

return async (payload) => {
try {
await driver.connect(config);

const result = await target[prop](payload);

await driver.disconnect();

return result;
} catch (err) {
console.log("Error occurred ===> ", err);
throw err;
}
};
}

throw new Error(`Method ${prop as string}() not found`);
},
});
}
10 changes: 10 additions & 0 deletions catalog/destinations/kafka/lib/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { AnyObject } from "../../../../types/destinationClassDefinition";

export interface IKafkaPushData {
topic: string;
data: string | string[] | Buffer | Buffer[] | AnyObject | AnyObject[];
headers?: AnyObject;
partition?: number;
key?: string;
timestamp?: number | string;
}
Loading

0 comments on commit 4a0e5c4

Please sign in to comment.