Twitter Producer

Twitter Producer gets data from Twitter based on some keywords and puts them in a Kafka topic, and then consumes with streaming service and sink into some database then creates an API service to get required data from the database server.

Basic Architecture


Tech stack

  • Python version: 3.10.1
  • IDE: PyCharmCE
  • MongoDB migrated distribution
  • Kafka Distribution
  • PySpark - Spark Streaming
  • GUI tool: MongoDB Compass
  • Flask
  • Postman - API Testing tool

Installation of Required Services

Kafka (Brew)

# Prerequisite for kafka
brew install java
# Install kafka (upto 5 min)
brew install kafka
# List services
brew services list
# May require to uncomment last 2 line as showed below
vi /opt/homebrew/etc/kafka/
  • Change the following in the file /opt/homebrew/etc/kafka/
# The address the socket server listens on. It will get the value returned from
# if not configured.
#     listeners = listener_name://host_name:port
#  listeners =PLAINTEXT://
# Start the services
brew services start zookeeper
brew services start kafka
# Create kafka topic
kafka-topics --create --topic <topic_name> --bootstrap-server localhost:9092 --replication-factor 1 --partitions 4
# Create producer console
kafka-console-producer --broker-list localhost:9092 --topic test-topic
> hello-world
> I am twitter producer
# Create consumer console in another terminal
kafka-console-consumer --bootstrap-server localhost:9092 --topic test-topic --from-beginning
I am twitter producer
# To delete Kafka Topic
kafka-topics --bootstrap-server localhost:9092 --delete --topic <topic_name>
# To list down all the topics
kafka-topics --list --bootstrap-server localhost:9092


brew tap mongodb/brew
brew install mongodb-community
brew services start mongodb-community

Mongo Compass

Download Link

Download NLTK

python3 utils/

To Run

  • Run app/producer python file. (Twitter to Kafka)
python3 app/
  • Start Consumer Service with Spark Submit (Kafka to Mongo with Spark Streaming)
python3 app/
spark-submit --class demo --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0,org.mongodb.spark:mongo-spark-connector_2.12:3.0.1
  • Correct Country
python3 utils/
  • Start Server (Flask app API to Mongo Server)
python3 server/

To Test

Run Python tests in tests
- Flask Server |
- Mongo Connection |
Set up test configuration within IDE

Airflow: Docker command

  • Downloading Airflow Docker Compose File
curl -LfO '' 
  • Building an Image
docker build . --tag extending_airflow:latest
  • Start Airflow Components
docker-compose up -d                                                                 
  • Down Airflow Components
docker-compose down   
  • Go to Container Shell
docker exec -it [Container Id] bash