Skip to content

Commit

Permalink
revert sse
Browse files Browse the repository at this point in the history
  • Loading branch information
cryptobench committed Oct 12, 2024
1 parent ff6f031 commit 82aba96
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 69 deletions.
89 changes: 22 additions & 67 deletions stats-backend/api2/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -1825,37 +1825,18 @@ def extract_wallets_and_ids():
from django.db import transaction
from django.db.models import Case, When, Value, BooleanField
from .models import NodeStatusHistory, Node
from django.db import IntegrityError

@app.task
def bulk_update_node_statuses(nodes_data):
status_history_to_create = []
redis_updates = {}
offline_nodes = []
online_nodes = []
nodes_to_create = []
for node_id, is_online in nodes_data:
latest_status = r.get(f"provider:{node_id}:status")

if latest_status is None or latest_status.decode() != str(is_online):
try:
node, created = Node.objects.get_or_create(
node_id=node_id,
defaults={'online': is_online}
)
if created:
node.type = "requestor"
node.save()
except IntegrityError:
# If creation fails due to race condition, try to get the object
try:
node = Node.objects.get(node_id=node_id)
except Node.DoesNotExist:
print(f"Node {node_id} not found")
# If still not found, create a new object
node = Node(node_id=node_id, online=is_online, type="requestor")
node.save()
node.online = is_online
node.save()
status_history_to_create.append(
NodeStatusHistory(node_id=node_id, is_online=is_online)
)
Expand All @@ -1866,11 +1847,18 @@ def bulk_update_node_statuses(nodes_data):
else:
offline_nodes.append(node_id)

if status_history_to_create:
# Check if the node exists, if not, prepare to create it
if not Node.objects.filter(node_id=node_id).exists():
nodes_to_create.append(Node(node_id=node_id, online=is_online))

if status_history_to_create or nodes_to_create:
with transaction.atomic():
NodeStatusHistory.objects.bulk_create(status_history_to_create)

# Efficiently update Node objects for offline nodes
# Create new nodes if any
Node.objects.bulk_create(nodes_to_create, ignore_conflicts=True)

# Efficiently update Node objects for offline and online nodes
Node.objects.filter(node_id__in=offline_nodes).update(online=False)
Node.objects.filter(node_id__in=online_nodes).update(online=True)
if redis_updates:
Expand All @@ -1879,52 +1867,10 @@ def bulk_update_node_statuses(nodes_data):


from .utils import check_node_status
import aiohttp
import asyncio
import json
from celery.utils.log import get_task_logger
from celery_singleton import Singleton


@app.task(base=Singleton, bind=True, max_retries=None)
def listen_for_relay_events(self):
try:
asyncio.run(event_listener())
except Exception as exc:
print(f"listen_for_relay_events task failed: {exc}")
self.retry(countdown=5, exc=exc) # Retry after 5 seconds

async def event_listener():
url = "http://yacn2.dev.golem.network:9000/events"
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
async for line in resp.content:
if line:
try:
decoded_line = line.decode('utf-8').strip()
if decoded_line.startswith('event:'):
event_type = decoded_line.split(':', 1)[1].strip()
elif decoded_line.startswith('data:'):
node_id = decoded_line.split(':', 1)[1].strip()
event = {'Type': event_type, 'Id': node_id}
process_event(event)
except Exception as e:
print(f"Failed to process event: {e}")

def process_event(event):
event_type = event.get('Type')
node_id = event.get('Id')

if event_type == 'new-node':
print(f"New node: {node_id}")
bulk_update_node_statuses.delay([(node_id, True)])
elif event_type == 'lost-node':
print(f"Lost node: {node_id}")
bulk_update_node_statuses.delay([(node_id, False)])

@app.task
def initial_relay_nodes_scan():
def fetch_and_update_relay_nodes_online_status():
base_url = "http://yacn2.dev.golem.network:9000/nodes/"
current_online_nodes = set()
nodes_to_update = []

for prefix in range(256):
Expand All @@ -1936,14 +1882,23 @@ def initial_relay_nodes_scan():
for node_id, sessions in data.items():
node_id = node_id.strip().lower()
is_online = bool(sessions) and any('seen' in item for item in sessions if item)
current_online_nodes.add(node_id)
nodes_to_update.append((node_id, is_online))

except requests.RequestException as e:
print(f"Error fetching data for prefix {prefix:02x}: {e}")

# Bulk update node statuses
bulk_update_node_statuses.delay(nodes_to_update)
listen_for_relay_events.delay()

# Check providers that were previously online but not found in the current scan
previously_online = set(NodeStatusHistory.objects.filter(
is_online=True
).order_by('node_id', '-timestamp').distinct('node_id').values_list('node_id', flat=True))

missing_nodes = previously_online - current_online_nodes
if missing_nodes:
check_missing_nodes.delay(list(missing_nodes))


@app.task
Expand Down
10 changes: 8 additions & 2 deletions stats-backend/core/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,17 @@ def setup_periodic_tasks(sender, **kwargs):
daily_volume_golem_vs_chain,
computing_total_over_time,
extract_wallets_and_ids,
initial_relay_nodes_scan,
fetch_and_update_relay_nodes_online_status,
)
v2_offer_scraper.apply_async(args=["ray-on-golem-heads"], queue="yagna", routing_key="yagna")
v2_offer_scraper.apply_async(queue="yagna", routing_key="yagna")
initial_relay_nodes_scan.delay()

sender.add_periodic_task(
30,
fetch_and_update_relay_nodes_online_status.s(),
queue="default",
options={"queue": "default", "routing_key": "default"},
)
sender.add_periodic_task(
60,
computing_total_over_time.s(),
Expand Down

0 comments on commit 82aba96

Please sign in to comment.