Skip to content

Commit

Permalink
Retry crop generator if failed (#127)
Browse files Browse the repository at this point in the history
* Handle run of crop generator request

* Handle crop generator to skip if still running or complete

* Update skip run handler

* Add retry crop plan generator every hour

* Update based on failed tests
  • Loading branch information
meomancer authored Sep 10, 2024
1 parent c4894fa commit 9a10cfc
Show file tree
Hide file tree
Showing 8 changed files with 429 additions and 43 deletions.
9 changes: 6 additions & 3 deletions django_project/core/celery.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
"""Tomorrow Now GAP."""
from __future__ import absolute_import, unicode_literals

import os
import logging
import os

from celery import Celery, signals
from celery.result import AsyncResult
Expand All @@ -11,7 +11,6 @@
from celery.worker.control import inspect_command
from django.utils import timezone


logger = logging.getLogger(__name__)

# set the default Django settings module for the 'celery' program.
Expand Down Expand Up @@ -50,6 +49,10 @@
# Run everyday at 01:30 UTC or 04:30 EAT
'schedule': crontab(minute='30', hour='1'),
},
'retry-crop-plan-generators': {
'task': 'retry_crop_plan_generators',
'schedule': crontab(minute='0', hour='*'),
},
'salient-collector-session': {
'task': 'salient_collector_session',
# Run every Monday 02:00 UTC
Expand Down Expand Up @@ -297,7 +300,7 @@ def task_failure_handler(


@signals.task_revoked.connect
def task_revoked_handler(sender, request = None, **kwargs):
def task_revoked_handler(sender, request=None, **kwargs):
"""Handle a cancelled task.
:param sender: task sender
Expand Down
32 changes: 23 additions & 9 deletions django_project/core/models/background_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,15 @@
"""


import uuid
import logging
from traceback import format_tb
import uuid
from ast import literal_eval as make_tuple
from django.db import models
from django.utils.translation import gettext_lazy as _
from traceback import format_tb

from django.conf import settings
from django.db import models
from django.utils import timezone

from django.utils.translation import gettext_lazy as _

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -137,10 +136,25 @@ class BackgroundTask(models.Model):
blank=True
)

running_states = [
TaskStatus.PENDING, TaskStatus.QUEUED, TaskStatus.RUNNING
]

def __str__(self):
"""Get string representation."""
return str(self.uuid)

@staticmethod
def running_tasks():
"""Return all running tasks."""
return BackgroundTask.objects.filter(
status__in=BackgroundTask.running_states
)

def is_running(self):
"""Check if task is running."""
return self.status in BackgroundTask.running_states

@property
def requester_name(self):
"""Get the requester name."""
Expand Down Expand Up @@ -284,7 +298,7 @@ def task_on_retried(self, reason):
update_fields=['last_update', 'progress_text']
)

def is_possible_interrupted(self, delta = 1800):
def is_possible_interrupted(self, delta=1800):
"""Check whether the task is stuck or being interrupted.
This requires the task to send an update to BackgroundTask.
Expand All @@ -294,8 +308,8 @@ def is_possible_interrupted(self, delta = 1800):
:rtype: bool
"""
if (
self.status == TaskStatus.QUEUED or
self.status == TaskStatus.RUNNING
self.status == TaskStatus.QUEUED or
self.status == TaskStatus.RUNNING
):
# check if last_update is more than 30mins than current date time
if self.last_update:
Expand Down
30 changes: 22 additions & 8 deletions django_project/gap/admin/crop_insight.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class FarmCropVarietyAdmin(admin.ModelAdmin):
def generate_insight_report_action(modeladmin, request, queryset):
"""Generate insight report."""
for query in queryset:
generate_insight_report(query.id)
generate_insight_report.delay(query.id)
modeladmin.message_user(
request,
'Process will be started in background!',
Expand All @@ -116,18 +116,17 @@ def generate_insight_report_action(modeladmin, request, queryset):
class CropInsightRequestAdmin(admin.ModelAdmin):
"""Admin for CropInsightRequest."""

list_display = ('requested_at', 'farm_list', 'file_url')
list_display = (
'requested_at', 'farm_count', 'file_url', 'last_task_status',
'background_tasks'
)
filter_horizontal = ('farms',)
actions = (generate_insight_report_action,)
readonly_fields = ('file',)

def farm_list(self, obj: CropInsightRequest):
def farm_count(self, obj: CropInsightRequest):
"""Return farm list."""
return [farm.unique_id for farm in obj.farms.all()]

def file(self, obj: CropInsightRequest):
"""Return file path."""
return [farm.unique_id for farm in obj.farms.all()]
return obj.farms.count()

def file_url(self, obj):
"""Return file url."""
Expand All @@ -137,3 +136,18 @@ def file_url(self, obj):
f'target="__blank__">{obj.file.url}</a>'
)
return '-'

def last_task_status(self, obj: CropInsightRequest):
"""Return task status."""
bg_task = obj.last_background_task
if bg_task:
return bg_task.status
return None

def background_tasks(self, obj: CropInsightRequest):
"""Return ids of background tasks that are running."""
url = (
f"/admin/core/backgroundtask/?context_id__exact={obj.id}&"
f"task_name__in={','.join(CropInsightRequest.task_names)}"
)
return format_html(f'<a target="_blank" href={url}>link</a>')
100 changes: 97 additions & 3 deletions django_project/gap/models/crop_insight.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import json
import uuid
from datetime import date
from datetime import date, timedelta

from django.conf import settings
from django.contrib.auth import get_user_model
Expand All @@ -17,6 +17,7 @@
from django.utils import timezone

from core.group_email_receiver import crop_plan_receiver
from core.models.background_task import BackgroundTask, TaskStatus
from core.models.common import Definition
from gap.models import Farm
from gap.models.lookup import RainfallClassification
Expand Down Expand Up @@ -340,7 +341,6 @@ def data(self) -> dict:
spw = FarmSuitablePlantingWindowSignal.objects.filter(
farm=self.farm,
generated_date=self.generated_date

).first()
if spw:
try:
Expand Down Expand Up @@ -408,6 +408,93 @@ class CropInsightRequest(models.Model):
null=True, blank=True
)

task_names = ['generate_insight_report', 'generate_crop_plan']

@property
def last_background_task(self) -> BackgroundTask:
"""Return background task."""
return BackgroundTask.objects.filter(
context_id=self.id,
task_name__in=self.task_names
).last()

@property
def background_tasks(self):
"""Return background task."""
return BackgroundTask.objects.filter(
context_id=self.id,
task_name__in=self.task_names
)

@property
def background_task_running(self):
"""Return background task that is running."""
return BackgroundTask.running_tasks().filter(
context_id=self.id,
task_name__in=self.task_names
)

@staticmethod
def today_reports():
"""Return query of today reports."""
now = timezone.now()
return CropInsightRequest.objects.filter(
requested_at__gte=now.date(),
requested_at__lte=now.date() + timedelta(days=1),
)

@property
def skip_run(self):
"""Skip run process."""
background_task_running = self.background_task_running
last_running_background_task = background_task_running.last()
last_background_task = self.last_background_task

# This rule is based on the second task that basically
# is already running
# So we need to check of other task is already running

# If there are already complete task
# Skip it
if self.background_tasks.filter(status=TaskStatus.COMPLETED):
return True

# If the last running background task is
# not same with last background task
# We skip it as the last running one is other task
if last_running_background_task and (
last_running_background_task.id != last_background_task.id
):
return True

# If there are already running task 2,
# the current task is skipped
if background_task_running.count() >= 2:
return True

now = timezone.now()
try:
if self.requested_at.date() != now.date():
return True
except AttributeError:
if self.requested_at != now.date():
return True
return False

def update_note(self, message):
"""Update the notes."""
if self.last_background_task:
self.last_background_task.progress_text = message
self.last_background_task.save()
self.notes = message
self.save()

def run(self):
"""Run the generate report."""
if self.skip_run:
return
self._generate_report()

@property
def title(self) -> str:
"""Return the title of the request."""
Expand All @@ -419,8 +506,9 @@ def title(self) -> str:
f"({east_africa_timezone})"
)

def generate_report(self):
def _generate_report(self):
"""Generate reports."""
from spw.generator.crop_insight import CropInsightFarmGenerator
output = []

# If farm is empty, put empty farm
Expand All @@ -430,6 +518,11 @@ def generate_report(self):

# Get farms
for farm in farms:
# If it has farm id, generate spw
if farm.pk:
self.update_note('Generating SPW for farm: {}'.format(farm))
CropInsightFarmGenerator(farm).generate_spw()

data = CropPlanData(
farm, self.requested_at.date(),
forecast_fields=[
Expand All @@ -444,6 +537,7 @@ def generate_report(self):
output.append([val for key, val in data.items()])

# Render csv
self.update_note('Generate CSV')
csv_content = ''

# Replace header
Expand Down
28 changes: 16 additions & 12 deletions django_project/gap/tasks/crop_insight.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,26 @@ def generate_spw(farms_id: list):
def generate_insight_report(_id: list):
"""Generate insight report."""
request = CropInsightRequest.objects.get(id=_id)
request.generate_report()
request.run()


@app.task(name="generate_crop_plan")
def generate_crop_plan():
"""Generate crop plan for registered farms."""
farms = Farm.objects.all().order_by('id')
# generate crop insight for all farms
for farm in farms:
CropInsightFarmGenerator(farm).generate_spw()
# create report request
request = CropInsightRequest.objects.create(
requested_by=User.objects.filter(
is_superuser=True
).first()
)
request.farms.set(farms)
user = User.objects.filter(is_superuser=True).first()
request = CropInsightRequest.objects.create(requested_by=user)
request.farms.set(Farm.objects.all().order_by('id'))
# generate report
request.generate_report()
request.run()


@app.task(name="retry_crop_plan_generators")
def retry_crop_plan_generators():
"""Retry crop plan generator.
This will run the crop plan generators but just run the is cancelled.
If it already has spw data, it will also be skipped.
"""
for request in CropInsightRequest.today_reports():
request.run()
Loading

0 comments on commit 9a10cfc

Please sign in to comment.