Skip to content

Latest commit

 

History

History

streaming-data-pipelines-with-flink-sql

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Stream Data to Cloud Databases with Confluent

Amid unprecedented volumes of data being generated, organizations need to harness the value of their data from heterogeneous systems in real time. However, on-prem databases are slow, rigid, and expensive to maintain, limiting the speed at which businesses can scale and drive innovation. Today’s organizations need scalable, cloud-native databases with real-time data. This demo walks you through building streaming data pipelines with Confluent Cloud. You’ll learn about:

  • Confluent’s fully managed Oracle CDC Source connector to stream customer data real time into Confluent Cloud while masking sensitive information with Single Message Transforms
  • Python producer to send messages to a Kafka topic
  • Flink SQL to process and enrich data streams in real time. You'll use aggregates and windowing to create a customer list of potentially stolen credit cards
  • A fully managed sink connector to load enriched data into MongoDB Atlas for real-time fraud analysis

Break down data silos and stream on-premises, hybrid, and multicloud data to cloud databases such as MongoDB Atlas, Azure Cosmos DB and more, so that every system and application has a consistent, up-to-date, and enhanced view of the data at all times. With Confluent streaming data pipelines, you can connect, process, and govern real-time data for all of your databases. Unlock real-time insights, focus on building innovative apps instead of managing databases, and confidently pave a path to cloud migration and transformation.

To learn more about Confluent’s solution, visit the Database streaming pipelines page

Architecture Diagram

This demo leverages two fully-managed connectors (Oracle CDC Source Premium and MongoDB Atlas Sink).


Requirements

In order to successfully complete this demo you need to install few tools before getting started.

  • If you don't have a Confluent Cloud account, sign up for a free trial here.

  • Install Confluent Cloud CLI by following the instructions here.

  • AWS account with permissions to create resources. Sign up for an account here.

  • Install Oracle DB driver here.

  • This demo uses Python 3.9.x version.

  • This demo uses pyodbc and faker modules. You can install this module through pip.

    pip3 install pyodbc faker
    

    Note: This demo was built and validate on a Mac (x86).

  • Download and Install Terraform here

Prerequisites

Confluent Cloud

  1. Sign up for a Confluent Cloud account here.

  2. After verifying your email address, access Confluent Cloud sign-in by navigating here.

  3. When provided with the username and password prompts, fill in your credentials.

    Note: If you're logging in for the first time you will see a wizard that will walk you through the some tutorials. Minimize this as you will walk through these steps in this guide.

AWS

This demo uses an Oracle Standard Edition database hosted on AWS that is publicly accessible.

MongoDB Atlas

Sign up for a free MongoDB Atlas account here.


Setup

  1. This demo uses Terraform and bash scripting to create and teardown infrastructure and resources.

  2. Clone and enter this repository.

    git clone https://github.com/confluentinc/demo-database-modernization.git
    cd demo-database-modernization/streaming-data-pipelines-with-flink-sql
  3. Create an .accounts file by running the following command.

    echo "CONFLUENT_CLOUD_EMAIL=add_your_email\nCONFLUENT_CLOUD_PASSWORD=add_your_password\nexport TF_VAR_confluent_cloud_api_key=\"add_your_api_key\"\nexport TF_VAR_confluent_cloud_api_secret=\"add_your_api_secret\"\nexport TF_VAR_mongodbatlas_public_key=\"add_your_public_key\"\nexport TF_VAR_mongodbatlas_private_key=\"add_your_private_key\"\nexport TF_VAR_mongodbatlas_org_id=\"add_your_org_id\"" > .accounts

    Note: This repo ignores .accounts file

Confluent Cloud

Create Confluent Cloud API keys by following this guide.

Note: This is different than Kafka cluster API keys.

MongoDB Atlas

Create an API key pair so Terraform can create resources in the Atlas cluster. Follow the instructions here.

Update the .accounts file for the following variables with your credentials.

 CONFLUENT_CLOUD_EMAIL=<replace>
 CONFLUENT_CLOUD_PASSWORD=<replace>
 export TF_VAR_confluent_cloud_api_key="<replace>"
 export TF_VAR_confluent_cloud_api_secret="<replace>"
 export TF_VAR_mongodbatlas_public_key="<replace>"
 export TF_VAR_mongodbatlas_private_key="<replace>"
 export TF_VAR_mongodbatlas_org_id="<replace>"

Create a local environment file

  1. Navigate to the confluent directory of the project and run create_env.sh script. This bash script copies the content of .accounts file into a new file called .env and append additional variables to it.

    cd streaming-data-pipelines-with-flink-sql/confluent
    ./create_env.sh
  2. Source .env file.

    source ../.env

    Note: if you don't source .env file you'll be prompted to manually provide the values through command line when running Terraform commands.

Build your cloud infrastructure

  1. Log into your AWS account through command line.

  2. Navigate to the repo's terraform directory.

    cd streaming-data-pipelines-with-flink-sql/terraform
  3. Initialize Terraform within the directory.

    terraform init
  4. Create the Terraform plan.

    terraform plan
  5. Apply the plan to create the infrastructure. You can run terraform apply -auto-approve to bypass the approval prompt.

    terraform apply

    Note: Read the main.tf configuration file to see what will be created.

  6. Write the output of terraform to a JSON file. The setup.sh script will parse the JSON file to update the .env file.

    terraform output -json > ../resources.json
  7. Run the setup.sh script.

    cd streaming-data-pipelines-with-flink-sql/confluent
    ./setup.sh
  8. This script achieves the following:

    • Creates an API key pair that will be used in connectors' configuration files for authentication purposes.
    • Updates the .env file to replace the remaining variables with the newly generated values.
  9. Source .env file.

    source ../.env

Prepare the Database for Change Data Capture

  1. Run the following Python script to create and populate a CUSTOMERS table, as well as enable Change Data Capture (CDC) on that table.

    cd streaming-data-pipelines-with-flink-sql/oracle
    python3 prepare_database.py
  2. Take a moment to inspect the files in the oracle directory to understand what just happened.

Demo

Configure Source Connector

Confluent offers 120+ pre-built connectors, enabling you to modernize your entire data architecture even faster. These connectors also provide you peace-of-mind with enterprise-grade security, reliability, compatibility, and support.

Automated Connector Configuration File Creation

You can use Confluent Cloud CLI to submit all the source connectors automatically.

Run a script that uses your .env file to generate real connector configuration json files from the example files located in the confluent folder.

cd streaming-data-pipelines-with-flink-sql/confluent
./create_connector_files.sh

Configure Oracle CDC Source Premium Source Connectors

You can create the connector either through CLI or Confluent Cloud web UI.

CLI
  1. Log into your Confluent account in the CLI.

    confluent login --save
  2. Use your environment and your cluster.

    confluent environment list
    confluent environment use <your_env_id>
    confluent kafka cluster list
    confluent kafka cluster use <your_cluster_id>
  3. Run the following commands to create Oracle CDC Source Premium and RabbitMQ Source connectors.

    cd streaming-data-pipelines-with-flink-sql/confluent
    confluent connect cluster create --config-file actual_oracle_cdc.json

Confluent Cloud Web UI
  1. Log into Confluent Cloud by navigating to https://confluent.cloud
  2. Step into Demo_Database_Modernization environment.
  3. If you are promoted with Unlock advanced governance controls screen, click on No thanks, I will upgrade later.

    Note: In this demo, the Essential package for Stream Governance is sufficient. However you can take a moment and review the differences between the Esstentials and Advanced packages.

  4. Step into demo_kafka_cluster.
  5. On the navigation menu, select Connectors and then + Add connector.
  6. In the search bar search for Oracle and select the Oracle CDC Source Premium which is a fully-managed connector.
  7. Create a new Oracle CDC Source Premium connector and complete the required fields using actual_oracle_cdc.json file.

Once both are fully provisioned, check for and troubleshoot any failures that occur. Properly configured, each connector begins reading data automatically.

In this demo, we are using Apache Kafka's Single Message Transforms (SMT) to mask customer PII field before data streams into Confluent Cloud. For more information on SMTs refer to our documentation.

Update Customer Information in Oracle Database

The fully-managed Oracle CDC Source connector for Confluent Cloud captures each change to rows in a database and then represents the changes as change event records in Apache Kafka® topics. You can make changes to the source database (Oracle) and see the updated messages in Confluent Cloud's topic.

  1. Navigate to confluent.cloud → Topics → ORCL.ADMIN.CUSTOMERS → Messages and keep the page open to see the update.

  2. Run a python script to increase Rica Blaisdell's average credit spend by $1 every 5 seconds. Leave this script running throughout the demo.

    cd streaming-data-pipelines-with-flink-sql/oracle
    python3 update_user.py
  3. Back in the Confluent Cloud console, verify the Rica Blaisdell's average credit has been updated.

Submit new credit card transactions

In a real world scenario, the credit card transactions might be coming from a web application, a database or other sources. To keep things simple in this demo, you'll use a Python producer to generate sample credit card transactions.

  1. Open a new Terminal window.

  2. Run the creditcard_send.py script

    cd streaming-data-pipelines-with-flink-sql/cc_transaction_scripts
    python3 creditcard_send.py
  3. This script generates a sample credit card transaction every second using the Faker library. The messages are serialized with JSON schema and Schema Registry keeps track of its evolution. Schema Registry provides a centralized repository for managing and validating schemas for topic message data, and for serialization and deserialization of the data over the network.

Enrich Data Streams with Flink SQL

Now that you have data flowing through Confluent, you can now easily build stream processing applications using Flink SQL. You are able to continuously transform, enrich, join, and aggregate your data using SQL syntax. You can gain value from your data directly from Confluent in real-time. Also, Confluent Cloud for Flink provides a truly cloud-native experience for Flink. You don’t need to know about or interact with Flink clusters, state backends, checkpointing, and all of the other aspects that are usually involved when operating a production-ready Flink deployment.

If you’re interested in learning more about Flink, you can take the Apache Flink 101 course on Confluent Developer website.

  1. Log into Confluent Cloud web UI, then click on Demo_Database_Modernization environment.

  2. Click on Flink (preview) and then Open SQL workspace.

  3. On the top right corner of your workspace select Demo_Database_Modernization as the catalog and demo_kafka_cluster as your database.

    Refer to the docs to understand the mapping between Kafka and Flink.

  4. On the left-hand side under Navigator menu, click the arrow to expand the Demo_Database_Modernization Kafka environment, and expand the demo_kafka_cluster to see existing Kafka topics. It should resemble the following image

  5. You will use the code editor to query existing Flink tables (Kafka topics) and to write new queries.

  6. To write a new query, click on the + icon to create a new cell.

    Note: For your convenience, all Flink queries are availble in the flink-queries.sql file.

  7. Query the ORCL.ADMIN.CUSTOMERS table and then hit Stop.

    SELECT * FROM `ORCL.ADMIN.CUSTOMERS`;

    Note: This topic is named as <database_name>.<schema_name>.<table_name> and you need the backtick otherwise Flink SQL would assume you are referring to a specific catalog and database.

  8. Add a new cell and query the credit_card_transactions table and then hit Stop.

    SELECT * FROM credit_card_transactions;
  9. Add a new cell. You will create a new table and join customer's information with credit card transaction matching on customer's id. The transaction_timestamp is used as the WATERMARK strategy. The CAST function is used, to cast the transaction_timestamp from String to Timestamp.

    CREATE TABLE fd_transactions_enriched(
       `user_id` BIGINT,
       `credit_card_number` STRING,
       `amount` DOUBLE,
       `transaction_timestamp` TIMESTAMP(0),
       `first_name` STRING,
       `last_name` STRING,
       `email` STRING,
       `avg_credit_spend` DOUBLE,
       WATERMARK FOR transaction_timestamp AS transaction_timestamp
    );
    
    INSERT INTO fd_transactions_enriched
       SELECT T.user_id,
          T.credit_card_number,
          T.amount,
          CAST(T.transaction_timestamp AS TIMESTAMP),
          C.FIRST_NAME,
          C.LAST_NAME,
          C.EMAIL,
          C.AVG_CREDIT_SPEND
       FROM credit_card_transactions T
       INNER JOIN `ORCL.ADMIN.CUSTOMERS` C
       ON T.user_id = C.ID;
  10. Add a new cell and query the newly created fd_transactions_enriched table and verify it's populated correctly.

    SELECT * FROM fd_transactions_enriched;
  11. Add a new cell and create a new table. You'll aggregate the stream of transactions for each account ID using a two-hour tumbling window, and filter for accounts in which the total spend in a two-hour period is greater than the customer’s average.

    CREATE TABLE fd_possible_stolen_card(
       `window_start` TIMESTAMP(0),
       `window_end` TIMESTAMP(0),
       `user_id` BIGINT,
       `credit_card_number` STRING,
       `first_name` STRING,
       `last_name` STRING,
       `email` STRING,
       `transaction_timestamp` TIMESTAMP(0),
       `total_credit_spend` DOUBLE,
       `avg_credit_spend` DOUBLE
    );
    INSERT INTO fd_possible_stolen_card
       SELECT window_start, window_end, user_id, credit_card_number, first_name, last_name, email, transaction_timestamp, SUM(amount) AS total_credit_spend, MAX(avg_credit_spend) as avg_credit_spend
       FROM TABLE (
          TUMBLE(TABLE fd_transactions_enriched, DESCRIPTOR(transaction_timestamp), INTERVAL '2' HOUR ))
       GROUP BY window_start, window_end, user_id, credit_card_number, first_name, last_name, email, transaction_timestamp
       HAVING SUM(amount) > MAX(avg_credit_spend);
  12. Add a new cell and verify the newly created table is being populated correctly.

    SELECT * FROM fd_possible_stolen_card;

Connect MongoDB Atlas to Confluent Cloud

You can create the MongoDB Atlas Sink connector either through CLI or Confluent Cloud web UI.

CLI
  1. Run the following command to create the MongoDB Atlas Sink connector.

    confluent connect cluster create --config-file confluent/actual_mongodb_sink.json

Confluent Cloud Web UI
  1. On the navigation menu, select Connectors and + Add connector.
  2. In the search bar search for MongoDB and select the MongoDB Atlas Sink which is a fully-managed connector.
  3. Create a new MongoDB Atlas Sink connector and complete the required fields using actual_mongodb_sink.json file.

Once the connector is in Running state navigate to cloud.mongodb.com → Collections → demo-db-mod.fd_possible_stolen_card and verify messages are showing up correctly.

Refer to our documentation for detailed instructions about this connector.

Confluent Cloud Stream Governance

Confluent offers data governance tools such as Stream Quality, Stream Catalog, and Stream Lineage in a package called Stream Governance. These features ensure your data is high quality, observable and discoverable. Learn more about Stream Governance here and refer to the docs page for detailed information.

  1. Navigate to https://confluent.cloud

  2. Use the left hand-side menu and click on Stream Lineage. Stream lineage provides a graphical UI of the end to end flow of your data. Both from the a bird’s eye view and drill-down magnification for answering questions like:

    • Where did data come from?
    • Where is it going?
    • Where, when, and how was it transformed?

    In the bird's eye view you see how one stream feeds into another one. As your pipeline grows and becomes more complex, you can use Stream lineage to debug and see where things go wrong and break.


CONGRATULATIONS

Congratulations on building your streaming data pipelines for streaming data to cloud databases in Confluent Cloud! Your complete pipeline should resemble the following one.


Teardown

You want to delete any resources that were created during the demo so you don't incur additional charges.

Credit Card Transactions Script

Go back to the terminal window where the creditcard_send.py is running and quit with Ctrl+C.

Oracle Script

Go back to the terminal window where the update_user.py is running and quit with Ctrl+C.

Infrastructure

  1. Run the following command to delete all connectors

    cd streaming-data-pipelines-with-flink-sql/confluent
    ./teardown_connectors.sh
  2. Run the following command to delete all resources created by Terraform

    cd streaming-data-pipelines-with-flink-sql/terraform
    terraform destroy

References

  1. Database modernization with Confluent Cloud blog
  2. Peering Connections in Confluent Cloud doc
  3. Oracle CDC Source Connector for Confluent Cloud doc
  4. Single Message Transforms for Managed Connectors doc
  5. MongoDB Atlas Sink Connector for Confluent Cloud doc
  6. Stream Governance page and doc