This project demonstrates an end‑to‑end real-time analytics pipeline using PySpark Structured Streaming. The pipeline ingests user activity logs from Kafka, applies multiple levels of transformations and aggregations, and computes both basic and session-level metrics. It includes dynamic anomaly detection using moving averages and standard deviations, and flags unusual spikes in activity.
The processed data is written to multiple sinks:
- HDFS (in Parquet format) for long-term storage and batch processing
- Kafka for real-time alerts
- Console output for quick debugging
- Static reports and a rudimentary dashboard (in CSV and HTML formats) for business insights
.
├── README.md
├── requirements.txt
├── config.py
├── log_parser.py # log parsing utilities and schema definitions
├── anomaly_detection.py # anomaly detection logic
├── metrics.py # metrics computation (basic & session-level)
├── storage_writer.py # functions to write output to HDFS, Kafka, console, etc.
├── dashboard.py # dashboard/report generation utilities
├── utilities.py # logging setup and helper functions
└── streaming_job.py # main PySpark streaming job
- Apache Spark: version 2.4+ with Structured Streaming support
- Kafka: running Kafka cluster with the topic
user_activity_logs
- HDFS or Cloud Object Store: for output storage
- Python 3.x
-
Clone:
git clone git@github.com:avrtt/user-activity-streaming-analytics.git cd user-activity-streaming-analytics
-
Install dependencies:
pip install -r requirements.txt
-
Configure connection parameters: edit
config.py
to adjust Kafka servers, topics, HDFS paths and anomaly detection thresholds as needed.
To start the streaming job, run:
spark-submit streaming_job.py
Make sure Kafka is running and that the topic user_activity_logs
is receiving data. The job will process logs in real time, compute metrics, detect anomalies, and write results to HDFS, Kafka and the console.
The static reports and dashboard HTML generated in the project can be used with any BI tool for further analysis. The CSV and HTML files are written to the path specified by REPORT_OUTPUT_PATH
in config.py
.
MIT