Skip to content

Commit

Permalink
Add Support for Auth/NoAuth with/without Encryption in Kafka with int…
Browse files Browse the repository at this point in the history
…egration tests (opensearch-project#3042)

Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
Co-authored-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
  • Loading branch information
kkondaka and Krishna Kondaka authored Jul 20, 2023
1 parent c91b1fb commit b003b08
Show file tree
Hide file tree
Showing 6 changed files with 554 additions and 21 deletions.
41 changes: 37 additions & 4 deletions data-prepper-plugins/kafka-plugins/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ log-pipeline:

- `session_timeout` (Optional) : The timeout used to detect client failures when using Kafka's group management. It is used for the rebalance.

- `max_retry_delay` (Optional) : By default the Kafka source will retry for every 1 second when there is a buffer write error. Defaults to `1s`.
- `max_retry_delay` (Optional) : By default the Kafka source will retry for every 1 second when there is a buffer write error. Defaults to `1s`.

- `auto_offset_reset` (Optional) : automatically reset the offset to the earliest or latest offset. Defaults to `earliest`.

Expand All @@ -89,7 +89,7 @@ Defaults to `4s`.

- `buffer_default_timeout` (Optional) : The maximum time to write data to the buffer. Defaults to `1s`.

- `fetch_max_bytes` (Optional) : The maximum record batch size accepted by the broker.
- `fetch_max_bytes` (Optional) : The maximum record batch size accepted by the broker.
Defaults to `52428800`.

- `fetch_max_wait` (Optional) : The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement. Defaults to `500`.
Expand Down Expand Up @@ -127,7 +127,7 @@ Defaults to `52428800`.
- `oauth_login_grant_type` (Optional) : This grant type refers to the way an application gets an access token.

- `oauth_login_scope` (Optional) : This scope limit an application's access to a user's account.

- `oauth_introspect_server` (Optional) : The URL of the introspect server. Most of the cases it should be similar to the oauth_login_server URL (Eg:https://dev.okta.com)

- `oauth_introspect_endpoint` (Optional) : The end point of the introspect server URL.(Eg: /oauth2/default/v1/introspect)
Expand All @@ -140,9 +140,42 @@ Defaults to `52428800`.

- `oauth_jwks_endpoint_url` (Optional) : The absolute URL for the oauth token refresh.

## Integration Tests

Before running the integration tests, make sure Kafka server is started
1. Start Zookeeper
```
bin/zookeeper-server-start.sh config/zookeeper.properties
```
2. Start Kafka Server with the following configuration
Configuration in config/server.properties
```
isteners=SASL_SSL://localhost:9093,PLAINTEXT://localhost:9092,SSL://localhost:9094,SASL_PLAINTEXT://localhost:9095
security.inter.broker.protocol=SASL_SSL
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
ssl.truststore.location=<location of truststore>
ssl.truststore.password=<password of truststore>
ssl.keystore.location=<location of keystore>
ssl.keystore.password=<password of keystore>
```
The truststore must have "localhost" certificates in them.

Command to start kafka server
```
bin/kafka-server-start.sh config/server.properties
```

3. Command to run integration tests

```
./gradlew data-prepper-plugins:kafka-plugins:integrationTest -Dtests.kafka.bootstrap_servers="localhost:9092" -Dtests.kafka.trust_store_location="/home/krishkdk/kafka/kafka-3.4.1-src/sec/client.truststore.jks" -Dtests.kafka.trust_store_password="kafkaks" -Dtests.kafka.saslssl_bootstrap_servers="localhost:9093" -Dtests.kafka.ssl_bootstrap_servers="localhost:9094" -Dtests.kafka.saslplain_bootstrap_servers="localhost:9095" -Dtests.kafka.username="admin" -Dtests.kafka.password="admin1" --tests "*KafkaSourceMultipleAuthTypeIT*"
```


## Developer Guide

This plugin is compatible with Java 11. See

- [CONTRIBUTING](https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md)
- [CONTRIBUTING](https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md)
- [monitoring](https://github.com/opensearch-project/data-prepper/blob/main/docs/monitoring.md)
11 changes: 9 additions & 2 deletions data-prepper-plugins/kafka-plugins/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ dependencies {
testImplementation 'org.apache.kafka:kafka_2.13:3.4.0:test'
testImplementation 'org.apache.curator:curator-test:5.5.0'
testImplementation 'io.confluent:kafka-schema-registry:7.4.0'
testImplementation 'junit:junit:4.13.1'
testImplementation testLibs.junit.vintage
testImplementation 'org.apache.kafka:kafka-clients:3.4.0:test'
testImplementation 'org.apache.kafka:connect-json:3.4.0'
}
Expand All @@ -52,7 +52,6 @@ sourceSets {
runtimeClasspath += main.output + test.output
srcDir file('src/integrationTest/java')
}
//resources.srcDir file('src/integrationTest/resources')
}
}

Expand All @@ -67,6 +66,14 @@ task integrationTest(type: Test) {

useJUnitPlatform()

classpath = sourceSets.integrationTest.runtimeClasspath
systemProperty 'tests.kafka.bootstrap_servers', System.getProperty('tests.kafka.bootstrap_servers')
systemProperty 'tests.kafka.saslssl_bootstrap_servers', System.getProperty('tests.kafka.saslssl_bootstrap_servers')
systemProperty 'tests.kafka.ssl_bootstrap_servers', System.getProperty('tests.kafka.ssl_bootstrap_servers')
systemProperty 'tests.kafka.saslplain_bootstrap_servers', System.getProperty('tests.kafka.saslplain_bootstrap_servers')
systemProperty 'tests.kafka.username', System.getProperty('tests.kafka.username')
systemProperty 'tests.kafka.password', System.getProperty('tests.kafka.password')

filter {
includeTestsMatching '*IT'
}
Expand Down
Loading

0 comments on commit b003b08

Please sign in to comment.