From 9a10cfc553e69236fdf6d5bf7782248aedc5d4e6 Mon Sep 17 00:00:00 2001 From: Irwan Fathurrahman Date: Tue, 10 Sep 2024 15:55:42 +0700 Subject: [PATCH] Retry crop generator if failed (#127) * Handle run of crop generator request * Handle crop generator to skip if still running or complete * Update skip run handler * Add retry crop plan generator every hour * Update based on failed tests --- django_project/core/celery.py | 9 +- django_project/core/models/background_task.py | 32 ++- django_project/gap/admin/crop_insight.py | 30 ++- django_project/gap/models/crop_insight.py | 100 +++++++- django_project/gap/tasks/crop_insight.py | 28 +- .../crop_insight/test_task_crop_insight.py | 239 ++++++++++++++++++ django_project/spw/generator/crop_insight.py | 13 +- .../spw/tests/test_crop_insight_generator.py | 21 +- 8 files changed, 429 insertions(+), 43 deletions(-) create mode 100644 django_project/gap/tests/crop_insight/test_task_crop_insight.py diff --git a/django_project/core/celery.py b/django_project/core/celery.py index 7f64572c..1f75bc3f 100644 --- a/django_project/core/celery.py +++ b/django_project/core/celery.py @@ -1,8 +1,8 @@ """Tomorrow Now GAP.""" from __future__ import absolute_import, unicode_literals -import os import logging +import os from celery import Celery, signals from celery.result import AsyncResult @@ -11,7 +11,6 @@ from celery.worker.control import inspect_command from django.utils import timezone - logger = logging.getLogger(__name__) # set the default Django settings module for the 'celery' program. @@ -50,6 +49,10 @@ # Run everyday at 01:30 UTC or 04:30 EAT 'schedule': crontab(minute='30', hour='1'), }, + 'retry-crop-plan-generators': { + 'task': 'retry_crop_plan_generators', + 'schedule': crontab(minute='0', hour='*'), + }, 'salient-collector-session': { 'task': 'salient_collector_session', # Run every Monday 02:00 UTC @@ -297,7 +300,7 @@ def task_failure_handler( @signals.task_revoked.connect -def task_revoked_handler(sender, request = None, **kwargs): +def task_revoked_handler(sender, request=None, **kwargs): """Handle a cancelled task. :param sender: task sender diff --git a/django_project/core/models/background_task.py b/django_project/core/models/background_task.py index c3183a09..8b0c29c7 100644 --- a/django_project/core/models/background_task.py +++ b/django_project/core/models/background_task.py @@ -6,16 +6,15 @@ """ - -import uuid import logging -from traceback import format_tb +import uuid from ast import literal_eval as make_tuple -from django.db import models -from django.utils.translation import gettext_lazy as _ +from traceback import format_tb + from django.conf import settings +from django.db import models from django.utils import timezone - +from django.utils.translation import gettext_lazy as _ logger = logging.getLogger(__name__) @@ -137,10 +136,25 @@ class BackgroundTask(models.Model): blank=True ) + running_states = [ + TaskStatus.PENDING, TaskStatus.QUEUED, TaskStatus.RUNNING + ] + def __str__(self): """Get string representation.""" return str(self.uuid) + @staticmethod + def running_tasks(): + """Return all running tasks.""" + return BackgroundTask.objects.filter( + status__in=BackgroundTask.running_states + ) + + def is_running(self): + """Check if task is running.""" + return self.status in BackgroundTask.running_states + @property def requester_name(self): """Get the requester name.""" @@ -284,7 +298,7 @@ def task_on_retried(self, reason): update_fields=['last_update', 'progress_text'] ) - def is_possible_interrupted(self, delta = 1800): + def is_possible_interrupted(self, delta=1800): """Check whether the task is stuck or being interrupted. This requires the task to send an update to BackgroundTask. @@ -294,8 +308,8 @@ def is_possible_interrupted(self, delta = 1800): :rtype: bool """ if ( - self.status == TaskStatus.QUEUED or - self.status == TaskStatus.RUNNING + self.status == TaskStatus.QUEUED or + self.status == TaskStatus.RUNNING ): # check if last_update is more than 30mins than current date time if self.last_update: diff --git a/django_project/gap/admin/crop_insight.py b/django_project/gap/admin/crop_insight.py index 87086014..83941199 100644 --- a/django_project/gap/admin/crop_insight.py +++ b/django_project/gap/admin/crop_insight.py @@ -104,7 +104,7 @@ class FarmCropVarietyAdmin(admin.ModelAdmin): def generate_insight_report_action(modeladmin, request, queryset): """Generate insight report.""" for query in queryset: - generate_insight_report(query.id) + generate_insight_report.delay(query.id) modeladmin.message_user( request, 'Process will be started in background!', @@ -116,18 +116,17 @@ def generate_insight_report_action(modeladmin, request, queryset): class CropInsightRequestAdmin(admin.ModelAdmin): """Admin for CropInsightRequest.""" - list_display = ('requested_at', 'farm_list', 'file_url') + list_display = ( + 'requested_at', 'farm_count', 'file_url', 'last_task_status', + 'background_tasks' + ) filter_horizontal = ('farms',) actions = (generate_insight_report_action,) readonly_fields = ('file',) - def farm_list(self, obj: CropInsightRequest): + def farm_count(self, obj: CropInsightRequest): """Return farm list.""" - return [farm.unique_id for farm in obj.farms.all()] - - def file(self, obj: CropInsightRequest): - """Return file path.""" - return [farm.unique_id for farm in obj.farms.all()] + return obj.farms.count() def file_url(self, obj): """Return file url.""" @@ -137,3 +136,18 @@ def file_url(self, obj): f'target="__blank__">{obj.file.url}' ) return '-' + + def last_task_status(self, obj: CropInsightRequest): + """Return task status.""" + bg_task = obj.last_background_task + if bg_task: + return bg_task.status + return None + + def background_tasks(self, obj: CropInsightRequest): + """Return ids of background tasks that are running.""" + url = ( + f"/admin/core/backgroundtask/?context_id__exact={obj.id}&" + f"task_name__in={','.join(CropInsightRequest.task_names)}" + ) + return format_html(f'link') diff --git a/django_project/gap/models/crop_insight.py b/django_project/gap/models/crop_insight.py index 4c5ef910..65c494ee 100644 --- a/django_project/gap/models/crop_insight.py +++ b/django_project/gap/models/crop_insight.py @@ -7,7 +7,7 @@ import json import uuid -from datetime import date +from datetime import date, timedelta from django.conf import settings from django.contrib.auth import get_user_model @@ -17,6 +17,7 @@ from django.utils import timezone from core.group_email_receiver import crop_plan_receiver +from core.models.background_task import BackgroundTask, TaskStatus from core.models.common import Definition from gap.models import Farm from gap.models.lookup import RainfallClassification @@ -340,7 +341,6 @@ def data(self) -> dict: spw = FarmSuitablePlantingWindowSignal.objects.filter( farm=self.farm, generated_date=self.generated_date - ).first() if spw: try: @@ -408,6 +408,93 @@ class CropInsightRequest(models.Model): null=True, blank=True ) + task_names = ['generate_insight_report', 'generate_crop_plan'] + + @property + def last_background_task(self) -> BackgroundTask: + """Return background task.""" + return BackgroundTask.objects.filter( + context_id=self.id, + task_name__in=self.task_names + ).last() + + @property + def background_tasks(self): + """Return background task.""" + return BackgroundTask.objects.filter( + context_id=self.id, + task_name__in=self.task_names + ) + + @property + def background_task_running(self): + """Return background task that is running.""" + return BackgroundTask.running_tasks().filter( + context_id=self.id, + task_name__in=self.task_names + ) + + @staticmethod + def today_reports(): + """Return query of today reports.""" + now = timezone.now() + return CropInsightRequest.objects.filter( + requested_at__gte=now.date(), + requested_at__lte=now.date() + timedelta(days=1), + ) + + @property + def skip_run(self): + """Skip run process.""" + background_task_running = self.background_task_running + last_running_background_task = background_task_running.last() + last_background_task = self.last_background_task + + # This rule is based on the second task that basically + # is already running + # So we need to check of other task is already running + + # If there are already complete task + # Skip it + if self.background_tasks.filter(status=TaskStatus.COMPLETED): + return True + + # If the last running background task is + # not same with last background task + # We skip it as the last running one is other task + if last_running_background_task and ( + last_running_background_task.id != last_background_task.id + ): + return True + + # If there are already running task 2, + # the current task is skipped + if background_task_running.count() >= 2: + return True + + now = timezone.now() + try: + if self.requested_at.date() != now.date(): + return True + except AttributeError: + if self.requested_at != now.date(): + return True + return False + + def update_note(self, message): + """Update the notes.""" + if self.last_background_task: + self.last_background_task.progress_text = message + self.last_background_task.save() + self.notes = message + self.save() + + def run(self): + """Run the generate report.""" + if self.skip_run: + return + self._generate_report() + @property def title(self) -> str: """Return the title of the request.""" @@ -419,8 +506,9 @@ def title(self) -> str: f"({east_africa_timezone})" ) - def generate_report(self): + def _generate_report(self): """Generate reports.""" + from spw.generator.crop_insight import CropInsightFarmGenerator output = [] # If farm is empty, put empty farm @@ -430,6 +518,11 @@ def generate_report(self): # Get farms for farm in farms: + # If it has farm id, generate spw + if farm.pk: + self.update_note('Generating SPW for farm: {}'.format(farm)) + CropInsightFarmGenerator(farm).generate_spw() + data = CropPlanData( farm, self.requested_at.date(), forecast_fields=[ @@ -444,6 +537,7 @@ def generate_report(self): output.append([val for key, val in data.items()]) # Render csv + self.update_note('Generate CSV') csv_content = '' # Replace header diff --git a/django_project/gap/tasks/crop_insight.py b/django_project/gap/tasks/crop_insight.py index 24a3d9c7..96fbbca0 100644 --- a/django_project/gap/tasks/crop_insight.py +++ b/django_project/gap/tasks/crop_insight.py @@ -28,22 +28,26 @@ def generate_spw(farms_id: list): def generate_insight_report(_id: list): """Generate insight report.""" request = CropInsightRequest.objects.get(id=_id) - request.generate_report() + request.run() @app.task(name="generate_crop_plan") def generate_crop_plan(): """Generate crop plan for registered farms.""" - farms = Farm.objects.all().order_by('id') - # generate crop insight for all farms - for farm in farms: - CropInsightFarmGenerator(farm).generate_spw() # create report request - request = CropInsightRequest.objects.create( - requested_by=User.objects.filter( - is_superuser=True - ).first() - ) - request.farms.set(farms) + user = User.objects.filter(is_superuser=True).first() + request = CropInsightRequest.objects.create(requested_by=user) + request.farms.set(Farm.objects.all().order_by('id')) # generate report - request.generate_report() + request.run() + + +@app.task(name="retry_crop_plan_generators") +def retry_crop_plan_generators(): + """Retry crop plan generator. + + This will run the crop plan generators but just run the is cancelled. + If it already has spw data, it will also be skipped. + """ + for request in CropInsightRequest.today_reports(): + request.run() diff --git a/django_project/gap/tests/crop_insight/test_task_crop_insight.py b/django_project/gap/tests/crop_insight/test_task_crop_insight.py new file mode 100644 index 00000000..8ff5be3d --- /dev/null +++ b/django_project/gap/tests/crop_insight/test_task_crop_insight.py @@ -0,0 +1,239 @@ +# coding=utf-8 +"""Tomorrow Now GAP. + +.. note:: Unit tests for GAP Models. +""" + +import datetime +from unittest.mock import patch + +from django.test import TestCase +from django.utils import timezone + +from core.factories import BackgroundTaskF +from core.models.background_task import TaskStatus +from gap.factories import CropInsightRequestFactory +from gap.models import CropInsightRequest +from gap.tasks.crop_insight import retry_crop_plan_generators + + +class CropInsideTaskRUDTest(TestCase): + """Crop test case.""" + + Factory = CropInsightRequestFactory + Model = CropInsightRequest + + def test_today_reports(self): + """Test query today reports.""" + now = timezone.now() + self.Factory(requested_at=now) + self.Factory(requested_at=now) + self.Factory(requested_at=now + datetime.timedelta(days=-1)) + self.assertEqual(CropInsightRequest.today_reports().count(), 2) + + @patch('gap.models.crop_insight.CropInsightRequest._generate_report') + def test_running(self, mock_generate_report): + """Test skip run.""" + # No skip running of no bg task + report = self.Factory() + self.assertFalse(report.skip_run) + + # ----------------------------------------------------- + # For first background, everything are run + # ----------------------------------------------------- + # No skip running if bg task is still PENDING + bg_task_1 = BackgroundTaskF.create( + task_name='generate_crop_plan', + context_id=report.id + ) + bg_task_1.status = TaskStatus.PENDING + bg_task_1.save() + report.run() + self.assertEqual(mock_generate_report.call_count, 1) + + # Skip running if bg task is QUEUED + bg_task_1.status = TaskStatus.QUEUED + report.run() + self.assertEqual(mock_generate_report.call_count, 2) + + # Skip running if bg task is still RUNNING + bg_task_1.status = TaskStatus.RUNNING + report.run() + self.assertEqual(mock_generate_report.call_count, 3) + + # Skip running if bg task is COMPLETED + bg_task_1.status = TaskStatus.COMPLETED + report.run() + self.assertEqual(mock_generate_report.call_count, 4) + + # No skip running if bg task is CANCELLED + bg_task_1.status = TaskStatus.CANCELLED + report.run() + self.assertEqual(mock_generate_report.call_count, 5) + + # No skip running if bg task is STOPPED + bg_task_1.status = TaskStatus.STOPPED + report.run() + self.assertEqual(mock_generate_report.call_count, 6) + + # No skip running if bg task is INVALIDATED + bg_task_1.status = TaskStatus.INVALIDATED + report.run() + self.assertEqual(mock_generate_report.call_count, 7) + + # ----------------------------------------------------- + # The second one is skipped if first one is still running + # ----------------------------------------------------- + # No skip running if bg task is still PENDING + bg_task_1.status = TaskStatus.RUNNING + bg_task_1.save() + bg_task_2 = BackgroundTaskF.create( + task_name='generate_crop_plan', + context_id=report.id + ) + bg_task_2.status = TaskStatus.PENDING + bg_task_2.save() + report.run() + self.assertEqual(mock_generate_report.call_count, 7) + + # Skip running if bg task is QUEUED + bg_task_2.status = TaskStatus.QUEUED + bg_task_2.save() + report.run() + self.assertEqual(mock_generate_report.call_count, 7) + + # Skip running if bg task is still RUNNING + bg_task_2.status = TaskStatus.RUNNING + bg_task_2.save() + report.run() + self.assertEqual(mock_generate_report.call_count, 7) + + # Skip running if bg task is COMPLETED + bg_task_2.status = TaskStatus.COMPLETED + bg_task_2.save() + report.run() + self.assertEqual(mock_generate_report.call_count, 7) + + # No skip running if bg task is CANCELLED + bg_task_2.status = TaskStatus.CANCELLED + bg_task_2.save() + report.run() + self.assertEqual(mock_generate_report.call_count, 7) + + # No skip running if bg task is STOPPED + bg_task_2.status = TaskStatus.STOPPED + bg_task_2.save() + report.run() + self.assertEqual(mock_generate_report.call_count, 7) + + # No skip running if bg task is INVALIDATED + bg_task_2.status = TaskStatus.INVALIDATED + bg_task_2.save() + report.run() + self.assertEqual(mock_generate_report.call_count, 7) + + @patch('gap.models.crop_insight.CropInsightRequest._generate_report') + def test_retry(self, mock_generate_report): + """Test skip run.""" + # No skip running of no bg task + now = timezone.now() + report_1 = self.Factory() + report_2 = self.Factory() + report_3 = self.Factory() + report_4 = self.Factory() + report_5 = self.Factory() + report_6 = self.Factory() + report_7 = self.Factory() + + # Report 8 is the older one + report_8 = self.Factory( + requested_at=now + datetime.timedelta(days=-1) + ) + + # Below is older tasks + BackgroundTaskF.create( + task_name='generate_crop_plan', + context_id=report_1.id, + status=TaskStatus.PENDING + ) + BackgroundTaskF.create( + task_name='generate_crop_plan', + context_id=report_2.id, + status=TaskStatus.QUEUED + ) + BackgroundTaskF.create( + task_name='generate_crop_plan', + context_id=report_3.id, + status=TaskStatus.RUNNING + ) + BackgroundTaskF.create( + task_name='generate_crop_plan', + context_id=report_4.id, + status=TaskStatus.COMPLETED + ) + BackgroundTaskF.create( + task_name='generate_crop_plan', + context_id=report_5.id, + status=TaskStatus.CANCELLED + ) + BackgroundTaskF.create( + task_name='generate_crop_plan', + context_id=report_6.id, + status=TaskStatus.STOPPED + ) + BackgroundTaskF.create( + task_name='generate_crop_plan', + context_id=report_7.id, + status=TaskStatus.INVALIDATED + ) + BackgroundTaskF.create( + task_name='generate_crop_plan', + context_id=report_8.id, + status=TaskStatus.CANCELLED + ) + + # Below is new task that will be retried + + BackgroundTaskF.create( + task_name='generate_crop_plan', + context_id=report_1.id + ) + BackgroundTaskF.create( + task_name='generate_crop_plan', + context_id=report_2.id + ) + BackgroundTaskF.create( + task_name='generate_crop_plan', + context_id=report_3.id + ) + BackgroundTaskF.create( + task_name='generate_crop_plan', + context_id=report_4.id + ) + BackgroundTaskF.create( + task_name='generate_crop_plan', + context_id=report_5.id + ) + BackgroundTaskF.create( + task_name='generate_crop_plan', + context_id=report_6.id + ) + BackgroundTaskF.create( + task_name='generate_crop_plan', + context_id=report_7.id + ) + BackgroundTaskF.create( + task_name='generate_crop_plan', + context_id=report_8.id + ) + + # Retry all + retry_crop_plan_generators() + + # The report that will be retried are + # all of today report that are + # - CANCELLED + # - STOPPED + # - INVALIDATED + # It should be report 5, 6 and 7 + self.assertEqual(mock_generate_report.call_count, 3) diff --git a/django_project/spw/generator/crop_insight.py b/django_project/spw/generator/crop_insight.py index ad8379bc..f0cf5997 100644 --- a/django_project/spw/generator/crop_insight.py +++ b/django_project/spw/generator/crop_insight.py @@ -8,6 +8,7 @@ from datetime import date, datetime, timedelta import pytz +from django.db import transaction from gap.models.crop_insight import ( FarmSuitablePlantingWindowSignal, FarmShortTermForecast, @@ -23,10 +24,10 @@ class CropInsightFarmGenerator: """Insight Farm Generator.""" - def __init__(self, farm: Farm): + def __init__(self, farm: Farm, requested_date=date.today()): """Init Generator.""" self.farm = farm - self.today = date.today() + self.today = requested_date self.tomorrow = self.today + timedelta(days=1) self.attributes = calculate_from_point_attrs() @@ -72,6 +73,14 @@ def save_shortterm_forecast(self, historical_dict, farm: Farm): pass def generate_spw(self): + """Generate spw. + + Do atomic because need all data to be saved. + """ + with transaction.atomic(): + self._generate_spw() + + def _generate_spw(self): """Generate Farm SPW.""" # Check already being generated, no regenereated! if FarmSuitablePlantingWindowSignal.objects.filter( diff --git a/django_project/spw/tests/test_crop_insight_generator.py b/django_project/spw/tests/test_crop_insight_generator.py index f799a905..e5755a75 100644 --- a/django_project/spw/tests/test_crop_insight_generator.py +++ b/django_project/spw/tests/test_crop_insight_generator.py @@ -248,6 +248,19 @@ def create_timeline_data( self.assertEqual( forecast.farmshorttermforecastdata_set.count(), 55 ) + # For farm 3 + mock_fetch_ltn_data.return_value = {} + mock_execute_spw_model.return_value = ( + True, { + 'metadata': { + 'test': 'abcdef' + }, + 'goNoGo': '', + 'nearDaysLTNPercent': [10.0], + 'nearDaysCurPercent': [60.0], + } + ) + mock_fetch_timelines_data.return_value = {} # Crop insight report self.request = CropInsightRequestFactory.create() @@ -346,19 +359,15 @@ def create_timeline_data( row_num += 1 @patch('spw.generator.crop_insight.CropInsightFarmGenerator.generate_spw') - @patch('gap.models.crop_insight.CropInsightRequest.generate_report') def test_generate_crop_plan( - self, mock_generate_report, mock_generate_spw + self, mock_generate_spw ): """Test generate crop plan for all farms.""" generate_crop_plan() self.assertEqual( CropInsightRequest.objects.count(), 1 ) - - # Called by 5 farms self.assertEqual(mock_generate_spw.call_count, 5) - mock_generate_report.assert_called_once() def test_email_send(self): """Test email send when report created.""" @@ -374,7 +383,7 @@ def mock_send_fn(self, fail_silently=False): "django.core.mail.EmailMessage.send", mock_send_fn ): request = CropInsightRequestFactory.create() - request.generate_report() + request.run() parent.assertEqual(len(self.recipients), 2) parent.assertEqual(