Skip to content

Commit

Permalink
Merge pull request #52 from neuroforgede/2.1.0
Browse files Browse the repository at this point in the history
remedy issue where we can accidentally DoS the event queue ourselves …
  • Loading branch information
s4ke authored Aug 28, 2023
2 parents b428cc2 + f759b7a commit 41a36c9
Showing 1 changed file with 8 additions and 5 deletions.
13 changes: 8 additions & 5 deletions skipper/skipper/dataseries/tasks/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from opentelemetry import trace # type: ignore
from django.db import transaction
from random import shuffle
from skipper.core.celery import task
from skipper.core.models.tenant import Tenant

Expand Down Expand Up @@ -57,7 +58,7 @@ def consumer_heartbeat(queue: BaseTenantQueue, job: TenantPostgresQueueJob) -> A
consumer.tenant.id,
consumer.id,
],
expires=60
expires=360
)


Expand Down Expand Up @@ -95,14 +96,16 @@ def actual_run_heartbeat_consumers(tenant_id: str, consumer_id: str) -> None:
# run this in celery every x seconds to wake up the event queue beat
@task(name="_3_wake_up_heartbeat_consumers", queue='event_queue', ignore_result=True) # type: ignore
def wake_up_heartbeat_consumers() -> None:
for dataseries_consumer in DataSeries_Consumer.objects.all().filter(
dataseries_consumers = list(DataSeries_Consumer.objects.all().filter(
tenant__deleted_at__isnull=True,
tenant__id__isnull=False
):
))
shuffle(dataseries_consumers)
for dataseries_consumer in dataseries_consumers:
actual_run_heartbeat_consumers.apply_async(
args=[
dataseries_consumer.tenant_id,
dataseries_consumer.consumer_id,
],
expires=60
)
expires=360
)

0 comments on commit 41a36c9

Please sign in to comment.