๐ Pathway Hackathon Submission - Track 2: Logistics Pulse Copilot
A real-time AI-powered logistics and finance document processing system that detects anomalies, ensures compliance, and provides instant insights using Pathway's streaming ETL pipeline.
In logistics operations, critical updates happen every few minutes:
- 8:07 AM: Driver safety status changes from "Low" to "High risk"
- 8:12 AM: Finance publishes new payout rules with updated rates
- 8:18 AM: Shipment scan flags "Exception: package missing"
The Challenge: If these updates don't surface instantly, bad decisions followโunsafe drivers stay on the road, wrong rates get quoted, customers wait in the dark.
Our Solution: A real-time RAG application that watches live data sources, indexes every new record through Pathway, and proves its currency with instant, up-to-date responses.
- Core Engine: Pathway framework handles all data ingestion and processing
- Real-Time Processing: Continuously ingests from file directories, APIs, and webhooks
- Streaming Pipeline:
backend/pipeline/pathway_ingest.pyimplements the backbone ETL
- On-the-Fly Integration: New data indexed automatically without manual reloads
- Real-Time Updates: Data changes flow through to answers immediately
- No Manual Rebuilds: Pathway's incremental processing eliminates rebuild needs
- Multiple Interfaces: FastAPI endpoints, React frontend, and direct API access
- Real-Time Responses: Answers reflect latest data within seconds
- Live Updates: T+0 data changes included in T+1 queries
- Before/After Proof: System designed to showcase live update flow
- Real-Time Demo: Add file โ trigger update โ see new answers immediately
- Hackathon Validation: Built-in demo endpoints for judges
โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ
โ Data Sources โ โ Pathway Engine โ โ AI Interface โ
โ โ โ โ โ โ
โ โข CSV Files โโโโโถโ โข Streaming ETL โโโโโถโ โข FastAPI โ
โ โข PDF Docs โ โ โข Dynamic Index โ โ โข React UI โ
โ โข API Feeds โ โ โข Anomaly Engine โ โ โข RAG Queries โ
โ โข Webhooks โ โ โข Vector Stores โ โ โข Live Updates โ
โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ
- Ingestion: Pathway monitors
./data/directories for new files - Processing: Streaming ETL extracts, transforms, and indexes data
- Anomaly Detection: AI engine flags suspicious patterns in real-time
- Vector Indexing: Documents automatically added to searchable stores
- Query Response: Users get answers reflecting latest data instantly
For Hackathon Judges: Here's the fastest way to see the system in action!
# Clone and start (all-in-one command)
git clone <repository-url> && cd logistics-pulse-copilot && .\start.ps1# Clone and start (all-in-one command)
git clone <repository-url> && cd logistics-pulse-copilot && chmod +x start.sh && ./start.sh# 1. Setup
pip install -r requirements.txt && python setup_enhanced.py
# 2. Start backend
python backend/main_enhanced.py &
# 3. Test it works
curl http://localhost:8000/api/status๐ฏ Verification: Visit http://localhost:8000/docs to see the live API, or http://localhost:3000 for the frontend UI.
- Python 3.8+
- Node.js 16+ (for frontend)
- Git
git clone https://github.com/your-username/logistics-pulse-copilot.git
cd logistics-pulse-copilot# Install Python dependencies
pip install -r requirements.txt
# Set up environment
cp .env.example .env
# Initialize data directories
python setup_enhanced.py# Windows
./start.bat
# macOS/Linux
./start.ps1# Start backend
cd backend
python main_enhanced.py
# Start frontend (new terminal)
cd frontend
npm install
npm start- Backend API: http://localhost:8000
- Frontend UI: http://localhost:3000
- API Documentation: http://localhost:8000/docs
- System Status: http://localhost:8000/api/status
- Initial Query: Ask "What high-risk shipments do we have?"
- Add New Data: Drop a CSV with anomalies into
./data/uploads/ - Watch Magic: Same query now returns updated results instantly!
# Check system status
GET /api/status
# Upload new document
POST /api/upload
# Query with real-time data
POST /api/query
# Trigger anomaly detection
POST /api/detect-anomalies
# Get current anomalies
GET /api/anomaliescurl -X POST "http://localhost:8000/api/query" \
-H "Content-Type: application/json" \
-d '{"message": "Are there any non-compliant invoices?"}'curl -X POST "http://localhost:8000/api/query" \
-H "Content-Type: application/json" \
-d '{"message": "Show me shipments with route deviations"}'# Update policy file, then query
curl -X POST "http://localhost:8000/api/query" \
-H "Content-Type: application/json" \
-d '{"message": "What are the current late fee rates?"}'| Component | File | Purpose |
|---|---|---|
| Streaming ETL | backend/pipeline/pathway_ingest.py |
Core Pathway pipeline for data processing |
| Pipeline Manager | backend/pipeline/pathway_manager.py |
Controls and monitors Pathway operations |
| Real-Time RAG | backend/models/rag_model.py |
Integrates Pathway with vector stores |
| Component | File | Purpose |
|---|---|---|
| Local LLM | backend/models/local_llm.py |
Hugging Face model integration |
| Vector Stores | backend/models/rag_model.py |
FAISS + Pathway vector indexing |
| Anomaly Detection | backend/pipeline/enhanced_anomaly_detector.py |
Real-time anomaly flagging |
| Endpoint | Purpose | Real-Time Feature |
|---|---|---|
/api/upload |
Document ingestion | Immediate processing via Pathway |
/api/query |
Natural language queries | Latest data always included |
/api/anomalies |
Risk alerts | Real-time anomaly detection |
/api/status |
System health | Live pipeline monitoring |
- Real-Time Updates: Driver risk status changes trigger immediate alerts
- Example: "Driver Maya moved from Low to High risk - recommend reassignment"
- Data Sources: Safety files, incident reports, performance metrics
- Policy Tracking: System cross-checks invoices against up-to-date contract terms
- Example: "Invoice #234 is non-compliant: late-fee clause #4 now applies"
- Real-Time: Finance updates โ instant policy application
- Live Monitoring: Real-time shipment feeds flag suspicious patterns
- Example: "Shipment #1027 shows significant route deviationโpossible fraud"
- Instant Investigation: Pulls relevant policies and historical data immediately
# Core streaming pipeline
class PathwayIngestPipeline:
def build_pipeline(self):
# 1. Input connectors for each data type
invoices = pw.io.fs.read("./data/invoices", format="csv", mode="streaming")
shipments = pw.io.fs.read("./data/shipments", format="csv", mode="streaming")
# 2. Real-time processing
processed_docs = self._process_documents(invoices, shipments)
# 3. Anomaly detection
anomalies = self._detect_anomalies(processed_docs)
# 4. Vector indexing
self._index_documents(processed_docs)# RAG model with Pathway integration
class LogisticsPulseRAG:
def add_document_to_index(self, content, doc_type, metadata):
if self.pathway_enabled:
# Route through Pathway for real-time processing
self._route_through_pathway(content, doc_type, metadata)
# Also update local store for immediate access
self._add_to_local_vector_store(content, doc_type, metadata)# Live retrieval with latest data
def process_query(self, query):
# 1. Sync with Pathway's latest output
self.sync_with_pathway_index()
# 2. Hybrid retrieval (semantic + keyword)
docs = self._hybrid_search(query)
# 3. Generate response with fresh data
return self._generate_response(query, docs)- Latency: Sub-second query responses with real-time data
- Throughput: Handles hundreds of documents per minute
- Scalability: Pathway enables horizontal scaling
- Memory: Efficient vector store management with FAISS
# Backend tests
python -m pytest backend/tests/
# Integration tests
python test_complete_workflow.py
# Real-time demo
python demo_causal_flow.py# 1. Start system
python start_system.py
# 2. Upload test data
python test_upload_data.py
# 3. Verify real-time processing
python test_dashboard_update.py| Format | Examples | Real-Time Processing |
|---|---|---|
| CSV | Invoices, shipments, driver data | โ Streaming ETL |
| Policies, contracts, reports | โ Text extraction + indexing | |
| JSON | API feeds, webhook data | โ Direct processing |
| Markdown | Policy documents | โ Chunked indexing |
Our demo video showcases:
- Initial State: System answers query with existing data
- Live Update: New document added to watched directory
- Pathway Processing: Real-time ETL pipeline processes new data
- Updated Response: Same query now includes new information
- Proof of Real-Time: Timestamps show sub-second updates
- LangGraph Integration: Multi-step reasoning workflows
- Agent Orchestration: Intelligent query routing and escalation
- REST API:
/api/agentsendpoint for agentic workflows
- Multi-modal Processing: Images, videos, audio files
- Webhook Integrations: Real-time API feeds
- Advanced Analytics: Predictive risk modeling
- Multi-tenant Support: Enterprise deployment ready
- Issues: Open GitHub issues for bugs or questions
- Discussions: Use GitHub Discussions for feature requests
- Documentation: Check
docs/folder for detailed guides
- โ Working Prototype: Fully functional system with real-time updates
- โ Code Repository: Complete source code with clear documentation
- โ Pathway Integration: Core streaming ETL using Pathway framework
- โ Dynamic Indexing: No manual rebuilds required
- โ Live Interface: API and UI for real-time queries
- โ Demo Ready: Built-in demonstration capabilities
- โ Setup Instructions: Clear installation and running guide
MIT License
Copyright (c) 2025 Logistics Pulse Copilot
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
Built for the Pathway Hackathon 2025 ๐
Demonstrating the power of real-time RAG for logistics operations