Confluent delivers a modern approach to break down data silos with streaming data pipelines and enables organizations with fully governed real-time data flows that can be shaped into multiple contexts while in motion, so different teams can gain self-service access to find, browse, create, share and reuse data, wherever and whenever it’s needed.
Enterprises can take advantage of Confluent’s real-time change data capture (CDC) capabilities to continuously intercept changes to the database as streams, combine, enrich and analyze it with other data streams while in motion, even before it reaches at-rest systems like the database or data warehouse, enabling engineers to build applications directly on the data pipeline and expand its use to more real-time use cases.
In addition, by continuously synchronizing change data across multiple systems automatically, organizations can use Confluent’s CDC capabilities to power data infrastructure and application modernization initiatives. By enabling multiple applications and systems across the organization to have a consistent, up-to-date view of data, Confluent helps enterprises unlock the full value of their data, allowing it to have a network effect.
By promoting data reusability, engineering agility and greater collaboration and trust, more teams can confidently do more with their data and bring data products to market faster.
This demo walks you through building streaming data pipelines with Confluent Cloud. You'll learn about:
- Confluent’s fully managed PostgresSQL CDC Source and Oracle CDC Source Premium connectors to stream products, orders, customers, and demographics data in real time to Confluent Cloud
- ksqlDB to process and enrich data in real time, generating a unified view of customers’ shopping habits
- Snowflake and Amazon Redshift Sink connectors to load the enriched data into data warehouses for subsequent analytics and reporting
This demo utilizes two fully-managed source connectors (Oracle CDC Source Premium and PostgreSQL CDC Source) and two fully-managed sink connectors (Snowflake and Amazon Redshift).
In order to successfully complete this demo you need to install few tools before getting started.
Note: This demo was built and validate on a Mac (x86).
- git
- Docker
- Terraform
- Special instructions for Apple Sillicon users are here
 
- Install Confluent Cloud CLI by following the instructions here.
- Python 3.9.x
- 
Sign up for a Confluent Cloud account here. 
- 
After verifying your email address, access Confluent Cloud sign-in by navigating here. 
- 
When provided with the username and password prompts, fill in your credentials. Note: If you're logging in for the first time you will see a wizard that will walk you through the some tutorials. Minimize this as you will walk through these steps in this guide. 
This demo uses an Oracle Standard Edition database and Amazon Redshift cluster hosted on AWS that are publicly accessible. Terraform is used to provision resources on AWS so ensure your user has sufficient privileges.
- Create a free account on Snowflake website.
- Your account must reside in the same region as your Confluent Cloud environment in order for the connector to work successfully. This demo is configured for aws-us-west-2.
This demo uses Terraform and bash scripting to create and teardown infrastructure and resources.
- 
Clone and enter this repo. git clone https://github.com/confluentinc/demo-change-data-capture.git cd demo-change-data-capture
- 
Create an accountsby running the following command.echo "CONFLUENT_CLOUD_EMAIL=add_your_email\nCONFLUENT_CLOUD_PASSWORD=add_your_password\nCONFLUENT_CLOUD_USER_FULL_NAME=\"add_user_full_name\"\nexport TF_VAR_confluent_cloud_api_key=\"add_your_api_key\"\nexport TF_VAR_confluent_cloud_api_secret=\"add_your_api_secret\"\nexport SNOWFLAKE_ACCOUNT=\"add_your_account_locator\"" > .accounts Note: This repo ignores .accountsfile
- 
Create Confluent Cloud API keys by following this guide. Note: This is different than Kafka cluster API keys. 
- 
Update the .accountsfile for the following variables with your credentials.CONFLUENT_CLOUD_EMAIL=<replace> CONFLUENT_CLOUD_PASSWORD=<replace> CONFLUENT_CLOUD_USER_FULL_NAME=<replace> export TF_VAR_confluent_cloud_api_key="<replace>" export TF_VAR_confluent_cloud_api_secret="<replace>" 
- 
Navigate to the Snowflake directory. cd demo-change-data-capture/snowflake
- 
Create an RSA key for Authentication. This creates the private and public keys you need to authenticate the service account for Terraform. openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out snowflake_tf_snow_key.p8 -nocrypt openssl rsa -in snowflake_tf_snow_key.p8 -pubout -out snowflake_tf_snow_key.pub
- 
Log in to the Snowflake console and create the user account by running the following command as the ACCOUNTADMINrole.But first: - Copy the text contents of the snowflake_tf_snow_key.pubfile, starting after the PUBLIC KEY header, and stopping just before the PUBLIC KEY footer.
- Paste over the RSA_PUBLIC_KEY_HERE label (shown below).
 
- Copy the text contents of the 
- 
Execute both of the following SQL statements to create the User and grant it access to the SYSADMINandSECURITYADMINroles needed for account management.CREATE USER "tf-snow" RSA_PUBLIC_KEY='RSA_PUBLIC_KEY_HERE' DEFAULT_ROLE=PUBLIC MUST_CHANGE_PASSWORD=FALSE; GRANT ROLE SYSADMIN TO USER "tf-snow"; GRANT ROLE SECURITYADMIN TO USER "tf-snow"; Note: We grant the user SYSADMINandSECURITYADMINprivileges to keep the lab simple. An important security best practice, however, is to limit all user accounts to least-privilege access. In a production environment, this key should also be secured with a secrets management solution like Hashicorp Vault, Azure Key Vault, or AWS Secrets Manager.
- 
Run the following to find the YOUR_ACCOUNT_LOCATORand your Snowflake Region ID values needed.SELECT current_account() as YOUR_ACCOUNT_LOCATOR, current_region() as YOUR_SNOWFLAKE_REGION_ID; Note: If your Snowflake account isn't in AWS-US-West-2 refer to doc to identify your account locator. 
- 
Update your .accountsfile and add the newly created credentials for the following variableexport SNOWFLAKE_ACCOUNT="YOUR_ACCOUNT_LOCATOR" 
- 
The tf-snowuser account will be used by Terraform to create the following resources in Snowflake. All these resources will be deleted at the end of the demo when you runterraform -destroy. However,tf-snowwon't get deleted.- A new user account named TF_DEMO_USERand a new public and private key pair.
- A warehouse named TF_DEMO.
- A database named TF_DEMO.
- All permissions needed for the demo.
 Note: For troubleshooting or more information review the doc. 
- A new user account named 
- 
Navigate to the confluentdirectory of the project and runcreate_env.shscript. This bash script copies the content of.accountsfile into a new file called.envand append additional variables to it.cd demo-change-data-capture/confluent ./create_env.sh
- 
Source .envfile.source ../.envNote: if you don't source .envfile you'll be prompted to manually provide the values through command line when running Terraform commands.
- 
Log into your AWS account through command line. 
- 
Navigate to the repo's terraform directory. cd demo-change-data-capture/terraform
- 
Initialize Terraform within the directory. terraform init 
- 
Create the Terraform plan. terraform plan 
- 
Apply the plan to create the infrastructure. You can run terraform apply -auto-approveto bypass the approval prompt.terraform apply Note: Read the main.tfconfiguration file to see what will be created.
- 
Write the output of terraformto a JSON file. Thesetup.shscript will parse the JSON file to update the.envfile.terraform output -json > ../resources.jsonNote: Verify that the resources.jsonis created at root level of demo-change-data-capture directory.
- 
Run the setup.shscript.cd demo-change-data-capture/confluent ./setup.sh
- 
This script achieves the following: - Creates an API key pair that will be used in connectors' configuration files for authentication purposes.
- Updates the .envfile to replace the remaining variables with the newly generated values.
 
- 
Source .envfile.source ../.env
- 
Run the following Python script to create and populate DEMOGRAPHICSandCUSTOMERStables, as well as enable Change Data Capture (CDC) on those tables.cd demo-change-data-capture/oracle python3 prepare_database.py
- 
Take a moment to inspect the files in the oracledirectory to understand what just happened.
Confluent offers 120+ pre-built connectors, enabling you to modernize your entire data architecture even faster. These connectors also provide you peace-of-mind with enterprise-grade security, reliability, compatibility, and support.
You can use Confluent Cloud CLI to submit all the source connectors automatically.
Run a script that uses your .env file to generate real connector configuration json files from the example files located in the confluent folder.
cd demo-change-data-capture/confluent
./create_connector_files.shYou can create the connectors either through CLI or Confluent Cloud web UI.
CLI
- 
Log into your Confluent account in the CLI. confluent login --save 
- 
Use your environment and your cluster. confluent environment list confluent environment use <your_env_id> confluent kafka cluster list confluent kafka cluster use <your_cluster_id> 
- 
Run the following commands to create Oracle and PostgreSQL CDC Source connectors. cd demo-change-data-capture/confluent confluent connect cluster create --config-file actual_oracle_cdc.json confluent connect cluster create --config-file actual_postgres_products_source.json
Confluent Cloud Web UI
- Log into Confluent Cloud by navigating to https://confluent.cloud
- Step into Demo_Change_Data_Capture environment.
- If you are promoted with Unlock advanced governance controls screen, click on No thanks, I will upgrade later.
Note: In this demo, you use the Advanced package for Stream Governance. However you can take a moment and review the differences between the Esstentials and Advanced packages. 
- Step into demo_kafka_cluster.
- On the navigation menu, select Connectors and then + Add connector.
- In the search bar search for Oracle and select the Oracle CDC Source Premium connector which is a fully-managed connector.
- Create a new Oracle CDC Source Premium connector and complete the required fields using actual_oracle_cdc.jsonfile.
- Now search for and add a Postgres CDC Source connector using actual_postgres_products_source.jsonfile.
Once both are fully provisioned, check for and troubleshoot any failures that occur. Properly configured, each connector begins reading data automatically.
- 
The fully-managed Oracle CDC Source connector for Confluent Cloud captures each change to rows in a database and then represents the changes as change event records in Apache Kafka® topics. Up until now, the data in Oracle database is static. Meaning after the initial load, there aren't any updates to the database. So ORCL.ADMIN.DEMOGRAPHICSandORCL.ADMIN.CUSTOMERStopics in Confluent Cloud are identical DEMOGRAPHICS and CUSTOMERS table in Oracle.
- 
You can run update_demographics.pyscript to see the changes in real time. This script will update thecountry_codefromUStoUSAfor all customers every 5 seconds.
- 
Open a new terminal and run python3 oracle/update_demographics.py 
- 
Navigate to Confluent Cloud web UI and see the changes stream in real time to ORCL.ADMIN.DEMOGRAPHICStopic.
If all is well, it's time to transform and join your data using ksqlDB. Ensure your topics are receiving records first.
Note: All queries are available in ksqldb_queries.sql file.
- 
Navigate to Confluent Cloud web UI and then go to ksqlDB cluster. 
- 
Change auto.offset.reset = earliest.
- 
Use the editor to execute the following queries. 
- 
Use the following statements to consume demographicsrecords.CREATE OR REPLACE STREAM demographics_composite ( struct_key STRUCT<ID VARCHAR> KEY, ID VARCHAR, STREET_ADDRESS VARCHAR, STATE VARCHAR, ZIP_CODE VARCHAR, COUNTRY VARCHAR, COUNTRY_CODE VARCHAR )WITH (KAFKA_TOPIC = 'ORCL.ADMIN.DEMOGRAPHICS', KEY_FORMAT='JSON',VALUE_FORMAT = 'JSON_SR'); 
- 
Verify demographics_compositestream is populated correctly and then hit Stop.SELECT * FROM demographics_composite EMIT CHANGES; 
- 
The type for the key field in demographics_compositeis aSTRUCT. To keep things simple you want to flat that field and then create a table. You'll achieve both by running the following queryCREATE TABLE demographics WITH (KAFKA_TOPIC='demographics', KEY_FORMAT='JSON',VALUE_FORMAT='JSON_SR') AS SELECT id, LATEST_BY_OFFSET(street_address) street_address, LATEST_BY_OFFSET(state) state, LATEST_BY_OFFSET(zip_code) zip_code, LATEST_BY_OFFSET(country) country, LATEST_BY_OFFSET(country_code) country_code FROM demographics_composite GROUP BY id EMIT CHANGES; 
- 
Verify demographicstable is populated correctly and then hit Stop.SELECT * FROM demographics; 
- 
Notice how the key field differs from demographics_compositestream.
- 
Repeat the same process for customers.CREATE OR REPLACE STREAM customers_composite ( struct_key STRUCT<ID VARCHAR> KEY, ID VARCHAR, FIRST_NAME VARCHAR, LAST_NAME VARCHAR, EMAIL VARCHAR, PHONE VARCHAR )WITH (KAFKA_TOPIC = 'ORCL.ADMIN.CUSTOMERS', KEY_FORMAT='JSON',VALUE_FORMAT = 'JSON_SR'); CREATE TABLE customers WITH (KAFKA_TOPIC='customers', KEY_FORMAT='JSON', VALUE_FORMAT='JSON_SR') AS SELECT id, LATEST_BY_OFFSET(first_name) first_name, LATEST_BY_OFFSET(last_name) last_name, LATEST_BY_OFFSET(email) email, LATEST_BY_OFFSET(phone) phone FROM customers_composite GROUP BY id EMIT CHANGES; 
- 
You can now join customersanddemographicsby customer ID to create an up-to-the-second view of each record.CREATE TABLE customers_enriched WITH (KAFKA_TOPIC='customers_enriched',KEY_FORMAT='JSON', VALUE_FORMAT='JSON_SR') AS SELECT c.id id, c.first_name, c.last_name, c.email, c.phone, d.street_address, d.state, d.zip_code, d.country, d.country_code FROM customers c JOIN demographics d ON d.id = c.id EMIT CHANGES; 
- 
Verify customers_enrichedstream is populated correctly and then hit Stop.SELECT * FROM customers_enriched EMIT CHANGES; 
- 
Next you will capture your productsrecords and convert the record key to a simpler value.CREATE STREAM products_composite ( struct_key STRUCT<product_id VARCHAR> KEY, product_id VARCHAR, `size` VARCHAR, product VARCHAR, department VARCHAR, price INT ) WITH (KAFKA_TOPIC='postgres.products.products', KEY_FORMAT='JSON', VALUE_FORMAT='JSON_SR', PARTITIONS=1, REPLICAS=3);CREATE STREAM products_rekeyed WITH ( KAFKA_TOPIC='products_rekeyed', KEY_FORMAT='JSON', VALUE_FORMAT='JSON_SR' ) AS SELECT product_id, `size`, product, department, price FROM products_composite PARTITION BY product_id EMIT CHANGES;
- 
Verify products_rekeyedstream is populated correctly and then hit Stop.SELECT * FROM products_rekeyed EMIT CHANGES; 
- 
Create a ksqlDB table to show the most up-to-date values for each productsrecord.CREATE TABLE products WITH ( KAFKA_TOPIC='products', KEY_FORMAT='JSON', VALUE_FORMAT='JSON_SR' ) AS SELECT product_id, LATEST_BY_OFFSET(`size`) `size`, LATEST_BY_OFFSET(product) product, LATEST_BY_OFFSET(department) department, LATEST_BY_OFFSET(price) price FROM products_rekeyed GROUP BY product_id EMIT CHANGES; 
- 
Verify the productstable is populated correctly.SELECT * FROM products; 
- 
Follow the same process using the ordersdata.CREATE STREAM orders_composite ( order_key STRUCT<`order_id` VARCHAR> KEY, order_id VARCHAR, product_id VARCHAR, customer_id VARCHAR ) WITH ( KAFKA_TOPIC='postgres.products.orders', KEY_FORMAT='JSON', VALUE_FORMAT='JSON_SR' );CREATE STREAM orders_rekeyed WITH ( KAFKA_TOPIC='orders_rekeyed', KEY_FORMAT='JSON', VALUE_FORMAT='JSON_SR' ) AS SELECT order_id, product_id, customer_id FROM orders_composite PARTITION BY order_id EMIT CHANGES;
- 
Verify orders_rekeyedstream is populated correctly and then hit Stop.SELECT * FROM orders_rekeyed EMIT CHANGES; 
- 
You're now ready to create a ksqlDB stream that joins these tables together to create enriched order data in real time. You will stream this topic to our data warehouses later on. CREATE STREAM orders_enriched WITH ( KAFKA_TOPIC='orders_enriched', KEY_FORMAT='JSON', VALUE_FORMAT='JSON_SR' ) AS SELECT o.order_id AS `order_id`, p.product_id AS `product_id`, p.`size` AS `size`, p.product AS `product`, p.department AS `department`, p.price AS `price`, c.id AS `customer_id`, c.first_name AS `first_name`, c.last_name AS `last_name`, c.email AS `email`, c.phone AS `phone`, c.street_address AS `street_address`, c.state AS `state`, c.zip_code AS `zip_code`, c.country AS `country`, c.country_code AS `country_code` FROM orders_rekeyed o JOIN products p ON o.product_id = p.product_id JOIN customers_enriched c ON o.customer_id = c.id PARTITION BY o.order_id EMIT CHANGES; 
- 
Verify orders_enrichedstream is populated correctly and then hit Stop.SELECT * FROM orders_enriched EMIT CHANGES; Note: You need a stream to 'hydrate' our data warehouse once the sink connector is set up. 
- 
Now you want to create a rewards table. Let's say you want to immediately notify our customers once they unlock a new status. You can easily do so by creating a table which is again updated in real time based on customer's purchases. CREATE TABLE rewards_status WITH(KAFKA_TOPIC='rewards_status', KEY_FORMAT='JSON', VALUE_FORMAT='JSON_SR') AS SELECT `customer_id`, `email`, SUM(`price`) AS `total`, CASE WHEN SUM(`price`) > 400 THEN 'GOLD' WHEN SUM(`price`) > 300 THEN 'SILVER' WHEN SUM(`price`) > 200 THEN 'BRONZE' ELSE 'CLIMBING' END AS `reward_level` FROM orders_enriched GROUP BY `customer_id`, `email`; 
- 
Verify rewards_statustable is populated correctly and then hit Stop.SELECT * FROM rewards_status; 
- 
You can either stream this data to an external system by leveraging a connector or you can write a producer that will publish updates (for example, an in-app notifications for a mobile app). The sky is the limit! 
Data portal is a self-service interface for discovering, exploring, and accessing Apache Kafka® topics on Confluent Cloud.
Building new streaming applications and pipelines on top of Kafka can be slow and inefficient when there is a lack of visibility into what data exists, where it comes from, and who can grant access. Data portal leverages Stream Catalog and Stream Lineage to empower data users to interact with their organization’s data streams efficiently and collaboratively.
- 
You will use a script to assign tags, business metadata, descriptions and other relevant information to each topic. 
- 
Run the update_topics.shcd demo-change-data-capture/confluent ./update_topics.sh
- 
Take a moment to inspect the update_topics.shscript to understand what just happened.
- 
Log into to Confluent Cloud web UI and click on Data Portal using the left hand-side menu. 
- 
Use the drop down and select Demo_Change_Data_Capture as your environment and see all recently modified topics. 
- 
You can discover topics by either using the search box or use filters. 
- 
For example, find all topics that are tagged as DataProduct. 
- 
You can click on each topic and learn who owns them, how to contact the owner, review the schema, see the events as they are being produced and other information. 
So far you have been experiencing Data Portal as an organization admin which has the highest levels of permissions. As an org admin, you can onboard new users and assign them the Data Discovery role so they can browse topics and request to access them.
Add user
- 
Log into to Confluent Cloud web UI and click on the hamberger menu on the top right corner of the screen next to the ? icon. 
- 
Click on Accounts & access and then on +Add user and enter an email address. If the recipient doesn't have a Confluent Cloud account, they will be invited to create one before being able to join your organization. 
- 
Uncheck the Add DataDiscovery role checkbox which is automatically selected. Click Continue. 
- 
Select Demo_Change_Data_Capture as the Environment, click on +Add role assignment and select DataDiscovery and hit Add and finally Confirm. 
- 
Your screen should resemble the following 
- 
The new user will recieve an email with instructions on how to join your organization. 
- 
After the new user logs in, they can navigate to the Data Portal tab and discover all existing topics, review tags, business metadata and more. However, they won't be able to see any messages until their request for access is approved. 
- 
After you (an org admin) approve their request, they can see the messages in the preview panel. Furthermore, they can review all messages inside any topics they have permission to read/write. 
You're now ready to sink data to Snowflake and Amazon Redshift.
You can create the connectors either through CLI or Confluent Cloud web UI.
CLI
- 
Log into your Confluent account in the CLI. 
- 
Use your environment and your cluster. confluent environment list confluent environment use <your_env_id> confluent kafka cluster list confluent kafka cluster use <your_cluster_id> 
- 
Run the following command to create Snowflake Sink connector cd demo-change-data-capture/confluent confluent connect cluster create --config-file actual_snowflake_sink.json confluent connect cluster create --config-file actual_redshift_sink.json
Confluent Cloud Web UI
- Log into Confluent Cloud by navigating to https://confluent.cloud
- Step into Demo_Change_Data_Capture environment.
- Step into demo_kafka_cluster.
- On the navigation menu, select Connectors and then + Add connector.
- In the search bar search for Snowflake and select the Snowflake Sink which is a fully-managed connector.
- Create a new connector and complete the required fields using actual_snowflake_sink.jsonfile.
- Repeat the same process and add Amazon Redshift Sink connector and complete the required fields using actual_redshift_sink.jsonfile.
Once the connectors are fully provisioned, check for and troubleshoot any failures that occur. Properly configured, each connector begins reading data automatically.
- 
Log into your Snowflake account. 
- 
Create a new worksheet or use an existing one. 
- 
Run the following commands USE ROLE TF_DEMO_SVC_ROLE; USE WAREHOUSE TF_DEMO; ALTER WAREHOUSE TF_DEMO RESUME; USE DATABASE TF_DEMO; SELECT * FROM "TF_DEMO"."PUBLIC".ORDERS_ENRICHED LIMIT 100; 
- 
You can flatten data in Snowflake if you wish. Use Snowflake's documentation. You can also query JSON data directly in Snowflake by naming the column and specifying columns of interest. For example: SELECT RECORD_CONTENT:email FROM "TF_DEMO"."PUBLIC".ORDERS_ENRICHED LIMIT 100; Note: To things simple in this demo TF_DEMO_SVC_ROLEis givenSECURITYADMINlevel permissions. However, you should always follow best practices in production environment.
- Log into your AWS account.
- Ensure you are logged into the right region. This demo is based in Oregon.
- Use the search bar and look for Redshift.
- Step into demo-cdccluster.
- Click on the hamburger menu on the top left hand-side.
- Click on Query Editor v2 which should open up in a new tab.
- Click on demo-cdc --> demo_confluent --> public --> Tables --> orders_enriched.
Note: If you are prompted for username and password use aws.redshift.userandaws.redshift.passwordfromactual_redshift_sink.jsonfile.
- Open a new worksheet and run the following query to see the results
SELECT * FROM "demo_confluent"."public"."orders_enriched" limit 100; 
Congratulations on building your streaming data pipelines for realtime data warehousing scenario in Confluent Cloud! Your complete pipeline should resemble the following one.

You want to delete any resources that were created during the demo so you don't incur additional charges.
- 
Run the following command to delete all connectors cd demo-change-data-capture/confluent ./teardown_connectors.sh
- 
Run the following command to delete all resources created by Terraform. You can run terraform destroy -auto-approveto bypass the approval step.cd demo-change-data-capture/terraform terraform destroy
If you created a Terraform user in Snowflake solely to run this lab, you can remove them.
- Log into Snowflake account and use a worksheet to delete tf-snowby runningDROP USER "tf-snow"; 
- Learn more about change data capture here
- Learn more about Streaming Data Pipelines here
- Learn more about Data Portal here
- Watch Stream Governance course here
- Try more demos:
- Real-time data warehousing https://github.com/confluentinc/demo-realtime-data-warehousing
- Streaming Data Pipelines to Cloud Databases https://github.com/confluentinc/demo-database-modernization
- Stream Designer, Confluent's visual canvas for rapidly building, testing, and deploying streaming data pipelines powered by Kafka https://github.com/confluentinc/demo-current-stream-designer
- Application Modernization https://github.com/confluentinc/demo-application-modernization
 





