Debezium Engine based Change Data Capture (CDC) source.
The Debezium Source
allows capturing database change events and streaming those over different message binders such Apache Kafka
, RabbitMQ
and all Spring Cloud Stream supporter brokers.
Note
|
This source can be used with any Spring Cloud Stream message binder. It is not restricted nor depended on the Kafka Connect framework. Though this approach is flexible it comes with certain limitations. |
All Debezium configuration properties are supported.
Just precede any Debezium properties with the debezium.properties.
prefix.
For example to set the Debezium’s connector.class
property use the debezium.properties.connector.class
source property instead.
The Debezium Source
currently supports CDC for multiple datastores: MySQL, PostgreSQL, MongoDB, Oracle, SQL Server, Db2, Vitess and Spanner databases.
Debezium provides a comprehensive message format, that accurately details information about changes that happen in the system.
Sometime this format, though, might not be suitable for the downstream consumers, that might require messages that are formatted so that field names and values are presented in a simplified, flattened
structure.
To simplify the format of the event records that the Debezium connectors produce, you can use the Debezium event flattening message transformation. Using the flattering configuration you can configure simple messages format like this:
--debezium.properties.transforms=unwrap
--debezium.properties.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
--debezium.properties.transforms.unwrap.drop.tombstones=false
--debezium.properties.transforms.unwrap.delete.handling.mode=rewrite
--debezium.properties.transforms.unwrap.add.fields=name,db
When a Debezium source runs, it reads information from the source and periodically records offsets
that define how much of that information it has processed.
Should the source be restarted, it will use the last recorded offset to know where in the source information it should resume reading.
Out of the box, the following offset storage configuration options are provided:
-
In-Memory
Doesn't persist the offset data but keeps it in memory. Therefore all offsets are lost on debezium source restart.
--debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore
-
Local Filesystem
Store the offsets in a file on the local file system (the file can be named anything and stored anywhere). Additionally, although the connector records the offsets with every source record it produces, the engine flushes the offsets to the backing store periodically (in the example below, once each minute).
--debezium.properties.offset.storage=org.apache.kafka.connect.storage.FileOffsetBackingStore --debezium.properties.offset.storage.file.filename=/tmp/offsets.dat # (1) --debezium.properties.offset.flush.interval.ms=60000 # (2)
-
Path to file where offsets are to be stored. Required when
offset.storage`
is set to theFileOffsetBackingStore
. -
Interval at which to try committing offsets. The default is 1 minute.
-
-
Kafka topic
Uses a Kafka topic to store offset data.
--debezium.properties.offset.storage=org.apache.kafka.connect.storage.KafkaOffsetBackingStore --debezium.properties.offset.storage.topic=my-kafka-offset-topic # (1) --debezium.properties.offset.storage.partitions=2 # (2) --debezium.properties.offset.storage.replication.factor=1 # (3) --debezium.properties.offset.flush.interval.ms=60000 # (4)
-
The name of the Kafka topic where offsets are to be stored. Required when
offset.storage
is set to theKafkaOffsetBackingStore
. -
The number of partitions used when creating the offset storage topic.
-
Replication factor used when creating the offset storage topic.
-
Interval at which to try committing offsets. The default is 1 minute.
-
One can implement the org.apache.kafka.connect.storage.OffsetBackingStore
interface in to provide a offset storage bound to a custom backend key-value store.
The table below lists all available Debezium properties for each connecter.
Those properties can be used by prefixing them by the debezium.properties.
prefix.
The debezium integration tests use databases fixtures, running on the local machine. Pre-build debezium docker database images with the help of Testcontainers are leveraged.
To run and debug the tests from your IDE you need to deploy the required database images from the command line. Instructions below explains how to run pre-configured test databases form Docker images.
Start the debezium/example-mysql
in a docker:
docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:2.3.3.Final
Tip
|
(optional) Use mysql client to connected to the database and to create a debezium user with required credentials:
|
docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'
mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium' IDENTIFIED BY 'dbz';
Use following properties to connect the Debezium Source to MySQL DB:
debezium.properties.connector.class=io.debezium.connector.mysql.MySqlConnector # (1)
debezium.properties.name=my-connector # (2)
debezium.properties.topic.prefix=my-topic # (2)
debezium.properties.database.server.id=85744 # (2)
debezium.properties.database.user=debezium # (3)
debezium.properties.database.password=dbz # (3)
debezium.properties.database.hostname=localhost # (3)
debezium.properties.database.port=3306 # (3)
debezium.properties.schema=true # (4)
debezium.properties.key.converter.schemas.enable=true # (4)
debezium.properties.value.converter.schemas.enable=true # (4)
debezium.properties.transforms=unwrap # (5)
debezium.properties.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState # (5)
debezium.properties.transforms.unwrap.add.fields=name,db # (5)
debezium.properties.transforms.unwrap.delete.handling.mode=none # (5)
debezium.properties.transforms.unwrap.drop.tombstones=true # (5)
debezium.properties.schema.history.internal=io.debezium.relational.history.MemorySchemaHistory # (6)
debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore # (6)
-
Configures the Debezium Source to use MySqlConnector.
-
Metadata used to identify and dispatch the incoming events.
-
Connection to the MySQL server running on
localhost:3306
asdebezium
user. -
Includes the Change Event Value schema in the
ChangeEvent
message. -
Enables the Change Event Flattening.
-
Source state to preserver between multiple starts.
You can run also the DebeziumDatabasesIntegrationTest#mysql()
using this mysql configuration.
Note
|
Disable the mysql GenericContainer test initialization code. |
Start a pre-configured postgres server from the debezium/example-postgres:1.0
Docker image:
docker run -it --rm --name postgres -p 5432:5432 -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres debezium/example-postgres:2.3.3.Final
You can connect to this server like this:
psql -U postgres -h localhost -p 5432
Use following properties to connect the Debezium Source to PostgreSQL:
debezium.properties.connector.class=io.debezium.connector.postgresql.PostgresConnector # (1)
debezium.properties.schema.history.internal=io.debezium.relational.history.MemorySchemaHistory # (2)
debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore # (2)
debezium.properties.topic.prefix=my-topic # (3)
debezium.properties.name=my-connector # (3)
debezium.properties.database.server.id=85744 # (3)
debezium.properties.database.user=postgres # (4)
debezium.properties.database.password=postgres # (4)
debezium.properties.database..dbname=postgres # (4)
debezium.properties.database.hostname=localhost # (4)
debezium.properties.database.port=5432 # (4)
debezium.properties.schema=true # (5)
debezium.properties.key.converter.schemas.enable=true # (5)
debezium.properties.value.converter.schemas.enable=true # (5)
debezium.properties.transforms=unwrap # (6)
debezium.properties.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState # (6)
debezium.properties.transforms.unwrap.add.fields=name,db # (6)
debezium.properties.transforms.unwrap.delete.handling.mode=none # (6)
debezium.properties.transforms.unwrap.drop.tombstones=true # (6)
-
Configures
Debezium Source
to use PostgresConnector. -
Configures the Debezium engine to use
memory
stores. -
Metadata used to identify and dispatch the incoming events.
-
Connection to the PostgreSQL server running on
localhost:5432
aspostgres
user. -
Includes the Change Event Value schema in the message.
-
Enables the Chage Event Flattening.
You can run also the DebeziumDatabasesIntegrationTest#postgres()
using this postgres configuration.
Note
|
Disable the postgres GenericContainer test initialization code. |
Start a pre-configured mongodb from the debezium/example-mongodb:2.3.3.Final
container image:
docker run -it --rm --name mongodb -p 27017:27017 -e MONGODB_USER=debezium -e MONGODB_PASSWORD=dbz debezium/example-mongodb:2.3.3.Final
Initialize the inventory collections
docker exec -it mongodb sh -c 'bash -c /usr/local/bin/init-inventory.sh'
In the mongodb
terminal output, search for a log entry like host: "3f95a8a6516e:27017"
:
2019-01-10T13:46:10.004+0000 I COMMAND [conn1] command local.oplog.rs appName: "MongoDB Shell" command: replSetInitiate { replSetInitiate: { _id: "rs0", members: [ { _id: 0.0, host: "3f95a8a6516e:27017" } ] }, lsid: { id: UUID("5f477a16-d80d-41f2-9ab4-4ebecea46773") }, $db: "admin" } numYields:0 reslen:22 locks:{ Global: { acquireCount: { r: 36, w: 20, W: 2 }, acquireWaitCount: { W: 1 }, timeAcquiringMicros: { W: 312 } }, Database: { acquireCount: { r: 6, w: 4, W: 16 } }, Collection: { acquireCount: { r: 4, w: 2 } }, oplog: { acquireCount: { r: 2, w: 3 } } } protocol:op_msg 988ms
Add 127.0.0.1 3f95a8a6516e
entry to your /etc/hosts
Use following properties to connect the Debezium Source to MongoDB:
debezium.properties.connector.class=io.debezium.connector.mongodb.MongodbSourceConnector # (1)
debezium.properties.topic.prefix=my-topic
debezium.properties.name=my-connector
debezium.properties.database.server.id=85744
debezium.properties.schema.history.internal=io.debezium.relational.history.MemorySchemaHistory # (2)
debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore # (2)
debezium.properties.mongodb.hosts=rs0/localhost:27017 # (3)
debezium.properties.topic.prefix=dbserver1 # (3)
debezium.properties.mongodb.user=debezium # (3)
debezium.properties.mongodb.password=dbz # (3)
debezium.properties.database.whitelist=inventory # (3)
debezium.properties.tasks.max=1 # (4)
debezium.properties.schema=true # (5)
debezium.properties.key.converter.schemas.enable=true # (5)
debezium.properties.value.converter.schemas.enable=true # (5)
debezium.properties.transforms=unwrap # (6)
debezium.properties.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState # (6)
debezium.properties.transforms.unwrap.add.fields=name,db # (6)
debezium.properties.transforms.unwrap.delete.handling.mode=none # (6)
debezium.properties.transforms.unwrap.drop.tombstones=true # (6)
-
Configures
Debezium Source
to use MongoDB Connector. -
Configures the Debezium engine to use
memory
. -
Connection to the MongoDB running on
localhost:27017
asdebezium
user. -
Includes the Change Event Value schema in the
SourceRecord
events. -
Enables the Chnage Event Flattening.
You can run also the DebeziumDatabasesIntegrationTest#mongodb()
using this mongodb configuration.
Start a sqlserver
from the debezium/example-postgres:1.0
Docker image:
docker run -it --rm --name sqlserver -p 1433:1433 -e ACCEPT_EULA=Y -e MSSQL_PID=Standard -e SA_PASSWORD=Password! -e MSSQL_AGENT_ENABLED=true microsoft/mssql-server-linux:2017-CU9-GDR2
Populate with sample data form debezium SqlServer tutorial:
wget https://raw.githubusercontent.com/debezium/debezium-examples/master/tutorial/debezium-sqlserver-init/inventory.sql
cat ./inventory.sql | docker exec -i sqlserver bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD'
Use following properties to connect the Debezium Source to SQLServer:
debezium.properties.connector.class=io.debezium.connector.sqlserver.SqlServerConnector # (1)
debezium.properties.schema.history.internal=io.debezium.relational.history.MemorySchemaHistory # (2)
debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore # (2)
debezium.properties.topic.prefix=my-topic # (3)
debezium.properties.name=my-connector # (3)
debezium.properties.database.server.id=85744 # (3)
debezium.properties.database.user=sa # (4)
debezium.properties.database.password=Password! # (4)
debezium.properties.database..dbname=testDB # (4)
debezium.properties.database.hostname=localhost # (4)
debezium.properties.database.port=1433 # (4)
-
Configures
Debezium Source
to use SqlServerConnector. -
Configures the Debezium engine to use
memory
state stores. -
Metadata used to identify and dispatch the incoming events.
-
Connection to the SQL Server running on
localhost:1433
assa
user.
You can run also the DebeziumDatabasesIntegrationTest#sqlServer()
using this SqlServer configuration.
Start Oracle reachable from localhost and set up with the configuration, users and grants described in the Debezium Vagrant set-up
Populate with sample data form Debezium Oracle tutorial:
wget https://raw.githubusercontent.com/debezium/debezium-examples/master/tutorial/debezium-with-oracle-jdbc/init/inventory.sql
cat ./inventory.sql | docker exec -i dbz_oracle sqlplus debezium/dbz@//localhost:1521/ORCLPDB1
java -jar debezium-source.jar --debezium.properties.connector.class=io.debezium.connector.mysql.MySqlConnector --debezium.properties.topic.prefix=my-topic --debezium.properties.name=my-connector --debezium.properties.database.server.id=85744 --debezium.properties.database.server.id=85744 --debezium.properties.database.user=debezium --debezium.properties.database.password=dbz --debezium.properties.database.hostname=localhost --debezium.properties.database.port=3306 --debezium.properties.schema.history.internal=io.debezium.relational.history.MemorySchemaHistory --debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore