Skip to content

Commit

Permalink
fix: allow users to use schemaless key (#20)
Browse files Browse the repository at this point in the history
* fix: allow users to use schemaless key

* chore: bump version
  • Loading branch information
EladLeev authored Mar 23, 2024
1 parent d97c9c5 commit 7dcdaa0
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 25 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ replay_pid*

.idea/
target/
.DS_Store
63 changes: 63 additions & 0 deletions dev/dev.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Development Environment

## DB Connection
Username: `sample`
Password: `sample`
DB Name: `sample`

## Create a new connector without the SMT
```bash
curl -X "POST" "http://localhost:8083/connectors" \
-H "Content-Type: application/json" \
-d $'{
"name": "jdbc-source",
"config": {
"connector.class": "JdbcSourceConnector",
"tasks.max": "1",
"connection.url":"jdbc:postgresql://db/sample?user=sample&password=sample",
"mode": "bulk",
"table.whitelist":"actor",
"validate.non.null":"false",
"topic.prefix":"postgres-jdbc-",
"name": "jdbc-source",
"transforms": "createKey,extractInt",
"transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields":"actor_id",
"transforms.extractInt.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractInt.field":"actor_id"
}
}'
```

## Create a new connector with the SMT
```bash
curl -X "POST" "http://localhost:8083/connectors" \
-H "Content-Type: application/json" \
-d $'{
"name": "jdbc-source",
"config": {
"connector.class": "JdbcSourceConnector",
"tasks.max": "1",
"connection.url":"jdbc:postgresql://db/sample?user=sample&password=sample",
"mode": "timestamp",
"timestamp.column.name":"last_update",
"table.whitelist":"actor",
"validate.non.null":"false",
"topic.prefix":"postgres-jdbc-",
"name": "jdbc-source",
"transforms": "createKey,extractInt,keyToField",
"transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields":"actor_id",
"transforms.extractInt.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractInt.field":"actor_id",
"transforms.keyToField.type": "com.github.eladleev.kafka.connect.transform.keytofield.KeyToFieldTransform",
"transforms.keyToField.field.name":"primaryKey",
"transforms.keyToField.field.delimiter": "_"
}
}'
```

## Known Issues
* `mujz/pagila` is not working well on Apple Silicon.
Make sure to use latest Docker version, and set "Use Rosetta for x86/amd64 emulation on Apple Silicon" to true.
If that's not possible, uncomment to use `synthesizedio/pagila` image instead.
38 changes: 16 additions & 22 deletions dev/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

version: '2'
services:
zookeeper:
Expand Down Expand Up @@ -48,6 +47,7 @@ services:
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_ORIGIN: '*'
SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_METHODS: 'GET,POST,PUT,OPTIONS'
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker-1:9092'

connect:
image: confluentinc/cp-kafka-connect:7.6.0
Expand Down Expand Up @@ -79,6 +79,17 @@ services:
CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
volumes:
- ./kafka-connect:/etc/kafka-connect/jars
command:
- bash
- -c
- |
echo "Installing Connector"
confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:10.7.6
#
echo "Launching Kafka Connect worker"
/etc/confluent/docker/run &
#
sleep infinity
rest-proxy:
image: confluentinc/cp-kafka-rest
Expand All @@ -101,6 +112,7 @@ services:

db:
image: mujz/pagila
# image: synthesizedio/pagila
environment:
- POSTGRES_PASSWORD=sample
- POSTGRES_USER=sample
Expand All @@ -118,25 +130,7 @@ services:
KAFKA_BROKERS: 'broker-1:9092'
KAFKA_SCHEMAREGISTRY_ENABLED: true
KAFKA_SCHEMAREGISTRY_URLS: 'http://schema_registry:8081'
CONNECT_ENABLED: true
CONNECT_CLUSTERS_NAME: 'connect'
CONNECT_CLUSTERS_URL: 'http://connect:8083'
restart: unless-stopped

connect-ui:
image: landoop/kafka-connect-ui
container_name: connect-ui
depends_on:
- connect
ports:
- "8001:8000"
environment:
- "CONNECT_URL=http://connect:8083"

schema-registry-ui:
image: landoop/schema-registry-ui
hostname: schema-registry-ui
depends_on:
- broker-1
- schema_registry
ports:
- "8002:8000"
environment:
SCHEMAREGISTRY_URL: 'http://${DOCKER_HOST_IP}:8081'
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<!-- Define project coordinates -->
<groupId>com.github.eladleev.kafka.connect.transform</groupId>
<artifactId>key-to-field-transform</artifactId>
<version>1.0.0</version>
<version>1.0.1</version>
<packaging>jar</packaging>

<!-- Define project properties -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,9 @@ private R applyWithSchema(R record) {
private String extractKeyAsString(Schema schema, Object key) {
LOGGER.trace("Extracting key as string");


if (!(key instanceof Struct)) {
throw new IllegalArgumentException("Key must be of type Struct");
return key.toString();
}

Struct keyStruct = (Struct) key;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public void applyWithComplexSchemaTest() {
System.out.println("[Test] applyWithComplexSchemaTest Done");
}

@Test(expected = IllegalArgumentException.class)
@Test
public void applyWithNonStructKeySchemaTest() {
xform.configure(Collections.singletonMap("field.name", "primaryKey"));

Expand Down

0 comments on commit 7dcdaa0

Please sign in to comment.