A full-stack NoSQL Knowledge Graph (KG) system for academic papers, built using MongoDB Atlas, Kafka, FastAPI, and Python. This project ingests papers, normalizes data, builds KG nodes & edges, and provides API endpoints for querying and traversing the KG.
- Project Structure
- Environment Setup
- MongoDB Atlas Setup
- Kafka Setup
- Running the Pipeline
- API
- Sharding Notes
- Troubleshooting
NOSQL/
├── api/ # FastAPI backend
│ ├── routes/ # API routes
│ │ ├── nodes.py
│ │ ├── edges.py
│ │── search.py
│ │── search_embeddings.py
│ │── traverse.py
│ ├── db.py # MongoDB connection
│ ├── main.py # FastAPI app entry
│ └── models.py # Pydantic models
├── ingestion/ # Data ingestion and Kafka
│ ├── pdf_parser.py
│ ├── data_normalizer.py
│ ├── kafka_producer.py
│ ├── kafka_pdf_producer.py
│ ├── kafka_consumer_kg.py
│ ├── kafka_mongo_consumer.py
│ └── kafka_api_fetcher.py
├── kg_builder/ # KG builder scripts
│ ├── kg_builder.py
│ └── kg_edge_builder.py
├── mongo-init-scripts/ # Optional MongoDB init scripts
├── nosqlenv/ # Python virtual environment
├── samples/ # Example papers / PDFs
├── .env # Environment variables
└── docker-compose.yml # Optional Docker setup
- Create a virtual environment
python -m venv nosqlenv- Activate the environment
# Windows
nosqlenv\Scripts\activate
# Linux/Mac
source nosqlenv/bin/activate- Install dependencies
pip install -r requirements.txt- Configure environment variables (
.envfile):
MONGODB_USER=<username> (sample : nosql_db)
MONGODB_PASS=<password> (sample: nosql_db)
MONGODB_CLUSTER=<cluster name> (sample: nosql.vojsy9y.mongodb.net)
MONGODB_DB=<database_name> (sample: NOSQL)
KAFKA_BOOTSTRAP=localhost:9092
- Create a MongoDB Atlas cluster (M2 or higher for sharding).
- Create a user (
nosql_db) with readWrite permissions on theNOSQLdatabase. - Whitelist your IP in Atlas network access.
- Update
.envwith your credentials.
Test connection:
from pymongo.mongo_client import MongoClient
from pymongo.server_api import ServerApi
uri = "mongodb+srv://<username>:<password>@nosql.vojsy9y.mongodb.net/?retryWrites=true&w=majority"
client = MongoClient(uri, server_api=ServerApi('1'))
try:
client.admin.command('ping')
print("✅ Successfully connected and authenticated!")
except Exception as e:
print("❌ Connection failed:", e)- Install Kafka and Zookeeper locally or via Docker.
- Start Zookeeper:
zookeeper-server-start.sh config/zookeeper.properties- Start Kafka broker:
kafka-server-start.sh config/server.properties- Create topics:
kafka-topics.sh --create --topic raw_papers --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1python pipeline_runner.py- Starts Kafka producer and consumer threads.
- Monitors MongoDB
paperscollection and triggers KG builder. - Logs will show ingestion, normalization, and KG updates.
Collections automatically created:
paperskg_nodeskg_edges
uvicorn api.main:app --reload- Available at: http://127.0.0.1:8000
- Nodes:
/nodes - Edges:
/edges - Search by text:
/search - Search embeddings:
/search_embeddings - Traverse KG:
/traverse
Test via browser or Postman.
⚠️ Only possible on M2 or higher clusters, not free-tier (M0).
-
Enable sharding via Atlas UI:
-
Navigate to Clusters → Collections → NOSQL → Collection → Shard Collection.
-
Choose shard key:
papers:"id"(hashed)kg_nodes:"id"(hashed)kg_edges:{ "source": 1, "target": 1 }
-
-
SSL errors connecting to Atlas
- Ensure Python OpenSSL >= 3.0.
- Use correct MongoDB URI format with
mongodb+srv://.
-
Authentication errors
- Check
.envcredentials match Atlas user. - Ensure user has readWrite on
NOSQLdatabase.
- Check
-
Kafka connection issues
- Verify broker is running and topic exists.
- Check
bootstrap_serversin.env.
-
Module import errors
- Run scripts from the project root.
python -m api.main