Skip to content

Commit

Permalink
feat: Add benthos with auth kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
flemzord committed Sep 21, 2022
1 parent 00291e3 commit c131661
Showing 1 changed file with 73 additions and 0 deletions.
73 changes: 73 additions & 0 deletions benthos/config_sasl.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
http:
enabled: true
address: 0.0.0.0:4195

tracer:
jaeger:
agent_address: "${JAEGER_COLLECTOR}"
service_name: ${SERVICE_NAME}

input:
kafka:
addresses: [ ${KAFKA_ADDRESS} ]
topics: [ ${KAFKA_TOPIC} ]
target_version: ${KAFKA_VERSION}
consumer_group: ${KAFKA_CONSUMER_GROUP}
checkpoint_limit: 1024
tls:
enabled: true
skip_cert_verify: true
sasl:
mechanism: ${KAFKA_SASL_MECHANISM}
user: ${KAFKA_USER}
password: ${KAFKA_PASSWORD}

pipeline:
processors:
- json_schema:
schema: '{"type": "object"}'
- catch:
- log:
level: ERROR
message: "Schema validation failed due to: ${!error()}"
- bloblang: root = deleted()
- switch:
- check: this.type == "SAVED_METADATA"
processors:
- bloblang: |
import "/config/saved_metadata.blobl"
root = this.apply("savedMetadata")
- check: this.type == "COMMITTED_TRANSACTIONS"
processors:
- bloblang: |
import "/config/committed_transactions.blobl"
root = this.apply("committedTransactions")
- check: this.type == "SAVED_PAYMENT"
processors:
- bloblang: |
import "/config/saved_payment.blobl"
root = this.apply("savedPayment")
- processors:
- bloblang: |
root = deleted()
- catch:
- bloblang: root = deleted()
output:
processors:
- bloblang: |
root = "%s\n".format(this.map_each(v -> v.string()).join("\n"))
broker:
outputs:
- stdout: {}
- http_client:
url: ${OPENSEARCH_URL}/_bulk
verb: POST
headers:
Content-Type: application/x-ndjson
tls:
enabled: ${OPENSEARCH_TLS_ENABLED}
skip_cert_verify: ${OPENSEARCH_TLS_SKIP_CERT_VERIFY}
basic_auth:
enabled: ${OPENSEARCH_BASIC_AUTH_ENABLED}
username: ${OPENSEARCH_AUTH_USERNAME}
password: ${OPENSEARCH_AUTH_PASSWORD}

0 comments on commit c131661

Please sign in to comment.