From 8dbdd7d0ebff2368c89dbc9e3d1df775e0bb739a Mon Sep 17 00:00:00 2001 From: Ian Stride Date: Mon, 31 Jul 2023 16:37:23 +0100 Subject: [PATCH] Create management command to identify and fix surveys migrated from v1 --- .../management/commands/fix_surveys.py | 207 ++++++++++++++++++ .../v1_page_revision_survey_form_field.json | 14 ++ .../tests/test_fix_surveys.py | 34 +++ 3 files changed, 255 insertions(+) create mode 100644 iogt_content_migration/management/commands/fix_surveys.py create mode 100644 iogt_content_migration/tests/resources/v1_page_revision_survey_form_field.json create mode 100644 iogt_content_migration/tests/test_fix_surveys.py diff --git a/iogt_content_migration/management/commands/fix_surveys.py b/iogt_content_migration/management/commands/fix_surveys.py new file mode 100644 index 000000000..91100c38a --- /dev/null +++ b/iogt_content_migration/management/commands/fix_surveys.py @@ -0,0 +1,207 @@ +import json + +from django.core.management.base import BaseCommand +from questionnaires.models import Survey, SurveyFormField +from wagtail.core.models import PageRevision + + +class Command(BaseCommand): + def add_arguments(self, parser): + parser.add_argument( + "--fix", + action="store_true", + help="Fix problems identified by the report", + ) + + def handle(self, *args, **options): + if problems_report := report(): + for entry in problems_report: + self.stdout.write(str(entry)) + else: + self.stdout.write("No problems found") + + if options.get("fix"): + self.stdout.write("Fix application started") + fix(problems_report) + self.stdout.write("Fix application completed") + + +class ReportEntry: + def __init__(self, survey): + self.survey = survey + self.problems = report_on_revision(survey) + + @property + def id(self): + return self.survey.id + + @property + def has_problems(self): + return len(self.problems) > 0 + + def __str__(self): + status = "live " if self.survey.live else "draft" + return f'{status}, {self.survey.id}, "{self.survey.title}", {self.problems}' + + +class SurveyRevision: + def __init__(self, revision): + self.revision = revision + self.content = json.loads(revision.content_json) + + @property + def pk(self): + return self.content.get("pk") + + @property + def fields(self): + return [ + SurveyRevisionField(field) + for field in self.content.get("survey_form_fields", []) + ] + + +class SurveyRevisionField: + def __init__(self, field): + self._raw = field + + @property + def pk(self): + return self._raw.get("pk") + + @property + def label(self): + return self._raw.get("label") + + +def report(): + return [ + entry + for survey in Survey.objects.all() + if (entry := ReportEntry(survey)).has_problems + ] + + +def report_on_revision(survey): + try: + return identify_problems(survey, get_latest_revision(survey)) + except PageRevision.DoesNotExist: + return {} + + +def get_latest_revision(page): + return SurveyRevision(PageRevision.objects.filter(page=page).latest("created_at")) + + +def identify_problems(survey, revision): + return { + problem + for p in [ + field_ids_mismatch, + id_mismatch, + labels_mismatch, + no_questions, + no_revision_questions, + ] + if (problem := p(survey, revision)) + } + + +def no_questions(survey, revision): + return "no_qs" if len(survey.get_form_fields()) < 1 else None + + +def no_revision_questions(survey, revision): + return "no_rev_qs" if len(revision.fields) < 1 else None + + +def id_mismatch(survey, revision): + return "id" if revision.pk != survey.id else None + + +def field_ids_mismatch(survey, revision): + survey_field_ids = {field.id for field in survey.get_form_fields()} + revision_field_ids = {field.pk for field in revision.fields if field.pk} + + return "field_ids" if survey_field_ids != revision_field_ids else None + + +def labels_mismatch(survey, revision): + survey_field_labels = {field.label for field in survey.get_form_fields()} + revision_field_labels = {field.label for field in revision.fields if field.label} + + return "labels" if survey_field_labels != revision_field_labels else None + + +def fix(problems_report): + for entry in problems_report: + if {"id", "field_ids"}.issubset(entry.problems): + print(f"Revision update required, survey={entry.survey}") + entry.survey.save_revision(log_action=True) + elif "no_qs" in entry.problems: + print( + f"Restore fields from previous revision required, survey={entry.survey}" + ) + for field in find_first_restorable_revision(entry.survey).fields: + create_field(entry.survey, field._raw).save() + latest_revision = entry.survey.save_revision(log_action=True) + latest_revision.publish() + else: + print(f"No action taken, survey={entry.survey}") + + +def find_first_restorable_revision(page): + return next( + sr + for revision in PageRevision.objects.filter(page=page).order_by("-created_at") + if is_restorable_v1(((sr := SurveyRevision(revision))), page) + ) + + +def is_restorable_v1(revision, page): + """Identifies a v1 PageRevision that can be used to restore a v2 Survey. + PageRevisions from v1 reference primary keys that do not match the v2 database + because revisions were copied verbatim from v1. It is possible, though unlikely + that the primary keys might be the same across v1 and v2. The alternative would be + to read the surveys directly from the v1 database. This method was chosen for the + sake of convenience. + """ + return revision.pk != page.id and len(revision.fields) > 0 + + +def create_field(survey, data): + return SurveyFormField( + admin_label=data.get("admin_label"), + choices="|".join( + choice.strip() for choice in data.get("choices", "").split(",") + ), + default_value=data.get("default_value"), + field_type=( + "positivenumber" + if (ftype := data.get("field_type")) == "positive_number" + else ftype + ), + help_text=data.get("help_text"), + label=data.get("label"), + page=survey, + page_break=data.get("page_break"), + required=data.get("required"), + skip_logic=[ + create_answer_option(item) + for item in json.loads(data.get("skip_logic", "[]")) + ], + sort_order=data.get("sort_order"), + ) + + +def create_answer_option(item): + value = item.get("value", {}) + + return ( + "skip_logic", + { + "choice": value.get("choice"), + "skip_logic": value.get("skip_logic"), + "question": value.get("question"), + }, + ) diff --git a/iogt_content_migration/tests/resources/v1_page_revision_survey_form_field.json b/iogt_content_migration/tests/resources/v1_page_revision_survey_form_field.json new file mode 100644 index 000000000..53f81b28e --- /dev/null +++ b/iogt_content_migration/tests/resources/v1_page_revision_survey_form_field.json @@ -0,0 +1,14 @@ +{ + "admin_label": "Learn anything?", + "choices": "All of this information is new to me, Most of this information is new to me, A little bit of this information is new to me, None of of this information is new to me", + "default_value": "", + "field_type": "radio", + "help_text": "Please choose one answer only", + "label": "Have you learnt anything new from this All In information?", + "page": 188, + "page_break": false, + "pk": 38, + "required": true, + "skip_logic": "[{\"type\": \"skip_logic\", \"value\": {\"choice\": \"Yes\", \"skip_logic\": \"next\", \"survey\": null, \"question\": null}, \"id\": \"70666059-b8d3-493c-8259-5c58208978d9\"}, {\"type\": \"skip_logic\", \"value\": {\"choice\": \"No\", \"skip_logic\": \"next\", \"survey\": null, \"question\": null}, \"id\": \"5a719c49-1b22-40c9-ae2e-5c6440c1d0f9\"}]", + "sort_order": 0 +} diff --git a/iogt_content_migration/tests/test_fix_surveys.py b/iogt_content_migration/tests/test_fix_surveys.py new file mode 100644 index 000000000..a87f739c7 --- /dev/null +++ b/iogt_content_migration/tests/test_fix_surveys.py @@ -0,0 +1,34 @@ +import json +from pathlib import Path + +from django.test import TestCase +from iogt_content_migration.management.commands.fix_surveys import create_field +from questionnaires.factories import SurveyFactory + + +class TestFixSurveys(TestCase): + def test_create_form_field(self): + survey = SurveyFactory() + + with open(open_resource("v1_page_revision_survey_form_field.json")) as fp: + field = create_field(survey, json.load(fp)) + + self.assertEqual(len(field.skip_logic), 2) + + option = field.skip_logic[0].value + self.assertEqual(option["choice"], "Yes") + self.assertEqual(option["skip_logic"], "next") + self.assertIsNone(option["question"]) + + self.assertEqual( + field.choices, + "All of this information is new to me|" + "Most of this information is new to me|" + "A little bit of this information is new to me|" + "None of of this information is new to me", + ) + self.assertEqual(field.page.id, survey.id) + + +def open_resource(filename): + return Path(__file__).parent / "resources" / filename