Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
f889e36
feat: implement self-correcting CHIME model and federated learning
anish1206 Jan 30, 2026
aa504b6
Update dreamsApp/app/dashboard/main.py
anish1206 Jan 30, 2026
eaa4aae
Update dreamsApp/app/dashboard/main.py
anish1206 Jan 30, 2026
7e579e5
Update dreamsApp/app/fl_worker.py
anish1206 Jan 30, 2026
616211f
Update tests/test_fl.py
anish1206 Jan 30, 2026
f008349
Update dreamsApp/app/dashboard/main.py
anish1206 Jan 30, 2026
b27f8fb
Update dreamsApp/app/utils/sentiment.py
anish1206 Jan 30, 2026
8e8e5be
applied suggestions as per the code reviewer
anish1206 Jan 30, 2026
5629c40
Update dreamsApp/app/utils/logger.py
anish1206 Jan 30, 2026
3ffe6d9
Update dreamsApp/app/utils/sentiment.py
anish1206 Jan 30, 2026
5b0e5be
Update tests/test_fl.py
anish1206 Jan 30, 2026
c20a461
Update dreamsApp/app/fl_worker.py
pradeeban Jan 31, 2026
884de2b
Update dreamsApp/app/dashboard/main.py
pradeeban Jan 31, 2026
a059dea
Update dreamsApp/app/utils/logger.py
pradeeban Jan 31, 2026
9d267d4
Update dreamsApp/app/dashboard/main.py
pradeeban Jan 31, 2026
a41abca
Update tests/test_fl.py
pradeeban Jan 31, 2026
79fbfea
Update dreamsApp/app/utils/sentiment.py
anish1206 Feb 1, 2026
047eba7
feat(fl): add queue-triggered FL, atomic lock, validation, rate-limit…
anish1206 Feb 1, 2026
415c983
Update dreamsApp/app/dashboard/main.py
anish1206 Feb 1, 2026
be2f0e5
exp/bert-improv final 1
anish1206 Feb 1, 2026
54935ba
final 1.1
anish1206 Feb 1, 2026
e98db87
Update dreamsApp/app/dashboard/main.py
anish1206 Feb 1, 2026
8a6036a
Update dreamsApp/app/fl_worker.py
anish1206 Feb 1, 2026
190ba9b
Update dreamsApp/app/fl_worker.py
anish1206 Feb 1, 2026
7221c38
Update dreamsApp/app/utils/sentiment.py
anish1206 Feb 1, 2026
932e736
Update dreamsApp/app/dashboard/main.py
anish1206 Feb 1, 2026
b5ec68b
final 1.2
anish1206 Feb 1, 2026
a986427
Update dreamsApp/app/fl_worker.py
pradeeban Feb 1, 2026
5f05c0a
Update dreamsApp/app/dashboard/main.py
pradeeban Feb 1, 2026
4d0b42c
Update dreamsApp/app/dashboard/main.py
pradeeban Feb 1, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -184,3 +184,7 @@ cython_debug/
# Virtual environments
venv310/
venv/

# Federated Learning Models
dreamsApp/app/models/production_chime_model/
dreamsApp/app/models/temp_training_artifact/
173 changes: 167 additions & 6 deletions dreamsApp/app/dashboard/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,20 @@
import numpy as np
import io
import base64
from flask_login import login_required
import threading
from flask_login import login_required, current_user
from wordcloud import WordCloud
from ..utils.llms import generate
from flask import jsonify
import datetime
from bson.objectid import ObjectId
from bson.errors import InvalidId

# Security: Whitelist of valid CHIME labels
VALID_CHIME_LABELS = {'Connectedness', 'Hope', 'Identity', 'Meaning', 'Empowerment', 'None'}

# Security: Rate limiting configuration
MAX_CORRECTIONS_PER_HOUR = 10

def generate_wordcloud_b64(keywords, colormap):
"""Refactor: Helper to generate base64 encoded word cloud image."""
Expand Down Expand Up @@ -113,9 +123,13 @@ def profile(target):
chime_lookup = {k.lower(): k for k in chime_counts}

for post in user_posts:
if post.get('chime_analysis'):
label = post['chime_analysis'].get('label', '').lower()
original_key = chime_lookup.get(label)
# Prioritize user correction if available
label_to_use = post.get('corrected_label')
if not label_to_use and post.get('chime_analysis'):
label_to_use = post['chime_analysis'].get('label', '')

if label_to_use:
original_key = chime_lookup.get(label_to_use.lower())
if original_key:
chime_counts[original_key] += 1

Expand Down Expand Up @@ -177,7 +191,20 @@ def profile(target):
wordcloud_positive_data = generate_wordcloud_b64(positive_keywords, 'GnBu')
wordcloud_negative_data = generate_wordcloud_b64(negative_keywords, 'OrRd')

return render_template('dashboard/profile.html', plot_url=plot_data, chime_plot_url=chime_plot_data, positive_wordcloud_url=wordcloud_positive_data, negative_wordcloud_url=wordcloud_negative_data, thematics=thematics,user_id=str(target_user_id))
# Sort posts to get the latest one
# The user_posts list is already sorted by timestamp ascending. The latest post is the last one.
latest_post = user_posts[-1] if user_posts else None

return render_template(
'dashboard/profile.html',
plot_url=plot_data,
chime_plot_url=chime_plot_data,
positive_wordcloud_url=wordcloud_positive_data,
negative_wordcloud_url=wordcloud_negative_data,
thematics=thematics,
user_id=str(target_user_id),
latest_post=latest_post # Pass only the latest post for feedback
)

@bp.route('/clusters/<user_id>')
@login_required
Expand Down Expand Up @@ -226,4 +253,138 @@ def thematic_refresh(user_id):
return jsonify({
"success": False,
"message": str(e)
}), 500
}), 500

@bp.route('/correct_chime', methods=['POST'])
@login_required
def correct_chime():
data = request.get_json()
post_id = data.get('post_id')
corrected_label = data.get('corrected_label')

if not all([post_id, corrected_label]):
return jsonify({'success': False, 'error': 'Missing fields'}), 400

# SECURITY: Validate ObjectId format
try:
post_object_id = ObjectId(post_id)
except (InvalidId, TypeError):
return jsonify({'success': False, 'error': 'Invalid post ID format'}), 400

# SECURITY: Validate label is in allowed set
if corrected_label not in VALID_CHIME_LABELS:
return jsonify({'success': False, 'error': 'Invalid label value'}), 400

mongo = current_app.mongo['posts']

# SECURITY: Rate limiting - max corrections per user per hour
one_hour_ago = datetime.datetime.utcnow() - datetime.timedelta(hours=1)
recent_corrections = mongo.count_documents({
'user_id': current_user.get_id(),
'correction_timestamp': {'$gte': one_hour_ago}
})

if recent_corrections >= MAX_CORRECTIONS_PER_HOUR:
return jsonify({'success': False, 'error': 'Rate limit exceeded. Try again later.'}), 429

# Step 1: ALWAYS save the correction to the queue first
now = datetime.datetime.utcnow()
result = mongo.update_one(
{'_id': post_object_id, 'user_id': current_user.get_id()},
{
'$set': {
'corrected_label': corrected_label, # Current correction
'is_fl_processed': False, # Added to queue
'correction_timestamp': now
},
'$push': {
# AUDIT TRAIL: Keep history of all corrections for auditing
'correction_history': {
'label': corrected_label,
'timestamp': now,
'user_id': current_user.get_id()
}
}
}
)

if result.modified_count > 0:
# Step 2: Check if we should trigger training (non-blocking)
_maybe_trigger_fl_training(current_app._get_current_object())
return jsonify({'success': True})
else:
return jsonify({'success': False, 'error': 'Post not found or no change'}), 404


def _maybe_trigger_fl_training(app):
"""
Check queue size and trigger training if threshold is met.
Uses atomic database lock to ensure only ONE training runs at a time.
If lock is busy, the correction is already saved - it will be processed next round.
"""
FL_BATCH_SIZE = app.config.get('FL_BATCH_SIZE', 50)
LOCK_TIMEOUT_HOURS = 2 # If lock is older than this, assume it's stale

with app.app_context():
mongo = app.mongo

# Quick count check
pending_count = mongo['posts'].count_documents({
'corrected_label': {'$exists': True},
'is_fl_processed': False
})

if pending_count < FL_BATCH_SIZE:
return # Not enough corrections yet, exit quickly

# Try to acquire atomic lock
# Only ONE request can successfully flip is_running from False to True
lock_collection = mongo['fl_training_lock']

# Ensure lock document exists (first-time setup)
lock_collection.update_one(
{'_id': 'singleton'},
{'$setOnInsert': {'is_running': False}},
upsert=True
)

# SECURITY: Check for stale lock (stuck for more than LOCK_TIMEOUT_HOURS)
stale_threshold = datetime.datetime.utcnow() - datetime.timedelta(hours=LOCK_TIMEOUT_HOURS)
lock_collection.update_one(
{'_id': 'singleton', 'is_running': True, 'started_at': {'$lt': stale_threshold}},
{'$set': {'is_running': False, 'stale_reset_at': datetime.datetime.now()}}
)

# Atomically try to acquire lock
lock_result = lock_collection.find_one_and_update(
{'_id': 'singleton', 'is_running': False},
{'$set': {'is_running': True, 'started_at': datetime.datetime.now()}},
return_document=False # Return the OLD document
)

if lock_result is None or lock_result.get('is_running', True):
# Lock is busy - another training is running
# Our correction is already saved in queue, it will be processed next round
return

# We got the lock! Start training in background thread
def run_training_with_lock():
# Wrap entire function in app_context since this runs in a separate thread
with app.app_context():
try:
# Import here to avoid circular dependency (fl_worker imports create_app)
from dreamsApp.app.fl_worker import run_federated_round
run_federated_round()
except Exception as e:
# Log the error since daemon threads fail silently
import logging
logging.error(f"FL Training failed in background thread: {str(e)}", exc_info=True)
finally:
# Always release lock when done (success or failure)
mongo['fl_training_lock'].update_one(
{'_id': 'singleton'},
{'$set': {'is_running': False, 'finished_at': datetime.datetime.now()}}
)

thread = threading.Thread(target=run_training_with_lock, daemon=True)
thread.start()
Loading