This project aimed at providing real-time click attribution and dynamic e-commerce insights.
- Introduction
- Prerequisites
- Setup and Run
- Architecture
- Data Flow
- Streaming Concept
- Project Overview
- Visualization
- Acknowledgements
An E-commerce website processes many requests daily, prompting the company to seek methods for identifying clicks that result in checkouts, thereby evaluating marketing efficacy. Additionally, the company requires near real-time insights into business performance.
We will implement First Click Attribution, considering the earliest click within the last 15 seconds.
(For more on click attribution, see here)
Below is a list of technologies used in this project:
Component | Description | URL |
---|---|---|
Flink | Stream processing | http://localhost:8081 |
Kafka | Data streaming | |
Grafana | Visualization | http://localhost:3000 |
PostgreSQL | OLTP database | |
Docker | Containerizing the project | |
Python | Programming language |
Docker is installed with at least 4GB RAM.
- Pull the project from the repository.
git clone https://github.com/Quocc1/Stream_Processing
-
Start the Docker engine.
-
CD to the project directory then spin up the docker-compose:
cd Stream_Processing
make up
- Wait for the docker-compose to finish, then run:
make run
Note: Run make help
or refer to the Makefile for details on commands and execution. Use make down
to stop the containers.
If you encounter issues running the Makefile on Windows, refer to this Stack Overflow post for potential solutions.
The diagram illustrates the conceptual view of the streaming pipeline (from left to right).
- Data is generated and sent to Kafka topics.
- Flink retrieves this data and performs operations such as enrichment, joining, filtering, recalculating, and aggregation.
- Then forward it to the PostgreSQL sink.
- Finally, Grafana pulls processed aggregate data for near real-time visualization.
For each successful attribution, we aim to identify which source the user clicked on, such as TikTok Ads, Google Ads, Facebook posts, etc. This evaluation helps determine the most effective advertising platform in the last hour.
Additionally, we evaluate real-time revenue and profit status, most popular categories, most common failed payment reasons, and customer gender distribution, all within the last hour.
(See details in the Visualization section below)
Source: Data is generated and sent to two topics: "Clicks" and "Checkouts". Flink listens to these topics and stores the data in source tables.
Transformation: After data ingestion, it undergoes a series of transformations including enriching checkout data, filtering out click records older than the last hour, and calculating profit and revenue.
Processing: Checkout attribution is performed between the "Checkouts" and "Clicks" topics using a stream-stream join operation. The result is sent to a PostgreSQL sink.
Sink: The sink receives the processed data and either persists it or forwards it to Grafana for visualization.
The diagram below illustrates the data flow in Flink.
Understanding time attributes in stream processing is crucial, especially considering the possibility of out-of-order data arrival. There are two main types of time in stream processing:
Event time: The time when the event occurred, typically provided by the system generating the data.
Processing time: The time when the event is processed by the stream processing system.
As events may arrive late, we need a mechanism to inform Apache Flink how long to wait for an event before considering it for processing. This is where watermarking comes into play.
Watermarking: A mechanism that instructs the stream processing system to wait for a specified duration before allowing late-arriving events to affect the output. For example, if the watermark is set to 5 minutes, Apache Flink will wait for 5 minutes after the event time before considering any late-arriving events for processing. Any events arriving after this watermark interval will be ignored.
checkout_timestamp TIMESTAMP(3),
processing_timestamp AS PROCTIME(),
WATERMARK FOR checkout_timestamp AS checkout_timestamp - INTERVAL '15' SECOND
In our project, we derive event time from the source data. We define a watermark of 15 seconds on the event time. This means that our clicks and checkouts tables will wait for 15 seconds before being considered complete. Any events arriving later than 15 seconds will not be considered for computation. This ensures that our processing is robust and accurate, even in the presence of late-arriving data.
In this project, we utilize the interval join in Flink, which allows us to join data streams based on a specified time range condition. The interval join is particularly useful when we need to ensure that one stream's time falls within a certain range of time of another stream.
For more details on interval joins or other joins in Flink, refer to the official documentation.
Here's an example of an interval join query used in our project:
checkouts AS co
JOIN users FOR SYSTEM_TIME AS OF co.processing_timestamp AS u ON co.user_id = u.user_id
JOIN products FOR SYSTEM_TIME AS OF co.processing_timestamp AS p ON co.product_id = p.product_id
LEFT JOIN clicks AS cl ON co.user_id = cl.user_id
AND co.checkout_timestamp BETWEEN cl.click_timestamp
AND cl.click_timestamp + INTERVAL '1' HOUR
In this query, we match checkouts with clicks based on the condition that the checkout occurred within 1 hour after the click time. This ensures that we only consider clicks that happened within the last hour relative to the checkout time.
Stream_Processing
├── assets/
│ └── pictures
├── code/
│ ├── generate_data/
│ │ └── gen_data.py
│ ├── process/
│ │ └── insert_into_sink.sql
│ ├── sink/
│ │ └── checkout_attribution.sql
│ ├── source/
│ │ ├── checkouts.sql
│ │ ├── clicks.sql
│ │ ├── products.sql
│ │ └── users.sql
│ └── main.py
├── container/
│ ├── flink/
│ │ ├── Dockerfile
│ │ └── requirements.txt
│ └── generate_data/
│ ├── Dockerfile
│ └── requirements.txt
├── data/
│ ├── Products.csv
│ └── Users.csv
├── grafana/
│ └── provisioning/
│ ├── dashboards/
│ │ ├── dashboard.json
│ │ └── dashboard.yml
│ └── datasources/
│ └── postgres.yml
├── postgres/
│ └── init.sql
├── .gitignore
├── docker-compose.yaml
├── Makefile
└── README.md
├── generate_data/
│ └── gen_data.py
gen_data.py: Generates and populates data into Kafka topics "clicks" and "checkouts".
├── source/
│ ├── checkouts.sql
│ ├── clicks.sql
│ ├── products.sql
│ └── users.sql
checkouts.sql: Defines source tables to retrieve data from Kafka checkouts topics, watermarks are set to "15 seconds".
chicks.sql: Defines source tables to retrieve data from Kafka clicks topics, watermarks are set to "15 seconds".
products.sql and users.sql: Define temporary tables for streaming joins.
├── sink/
│ └── checkout_attribution.sql
checkout_attribution.sql: Define a sink table that stores the final result from joining the stream
├── process/
│ └── insert_into_sink.sql
insert_into_sink.sql:
- Defines SQL script for processing data by joining stream data from Kafka topics "clicks" and "checkouts" within the last 1 hour.
- Finally, results are written into the PostgreSQL.
│ └── main.py
main.py: Creates sources, sink, and executes data processing.
For visualization using Grafana, access localhost:3000 (username admin
and password admin
).
After accessing Grafana with the provided credentials, choose the "Ecommerce Monitoring" dashboard for viewing.
I would like to express our gratitude to the author of the article "Data Engineering Project for Beginners: Stream Edition" on startdataengineering.com for providing valuable guidance and inspiration. This project drew upon the concepts and methodologies outlined in the article, serving as a foundational resource for learning and implementation.