Skip to content

Commit

Permalink
Create management command to identify and fix surveys migrated from v1
Browse files Browse the repository at this point in the history
  • Loading branch information
istride committed Jul 31, 2023
1 parent a72ed1c commit 8dbdd7d
Show file tree
Hide file tree
Showing 3 changed files with 255 additions and 0 deletions.
207 changes: 207 additions & 0 deletions iogt_content_migration/management/commands/fix_surveys.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
import json

from django.core.management.base import BaseCommand
from questionnaires.models import Survey, SurveyFormField
from wagtail.core.models import PageRevision


class Command(BaseCommand):
def add_arguments(self, parser):
parser.add_argument(
"--fix",
action="store_true",
help="Fix problems identified by the report",
)

def handle(self, *args, **options):
if problems_report := report():
for entry in problems_report:
self.stdout.write(str(entry))
else:
self.stdout.write("No problems found")

if options.get("fix"):
self.stdout.write("Fix application started")
fix(problems_report)
self.stdout.write("Fix application completed")


class ReportEntry:
def __init__(self, survey):
self.survey = survey
self.problems = report_on_revision(survey)

@property
def id(self):
return self.survey.id

@property
def has_problems(self):
return len(self.problems) > 0

def __str__(self):
status = "live " if self.survey.live else "draft"
return f'{status}, {self.survey.id}, "{self.survey.title}", {self.problems}'


class SurveyRevision:
def __init__(self, revision):
self.revision = revision
self.content = json.loads(revision.content_json)

@property
def pk(self):
return self.content.get("pk")

@property
def fields(self):
return [
SurveyRevisionField(field)
for field in self.content.get("survey_form_fields", [])
]


class SurveyRevisionField:
def __init__(self, field):
self._raw = field

@property
def pk(self):
return self._raw.get("pk")

@property
def label(self):
return self._raw.get("label")


def report():
return [
entry
for survey in Survey.objects.all()
if (entry := ReportEntry(survey)).has_problems
]


def report_on_revision(survey):
try:
return identify_problems(survey, get_latest_revision(survey))
except PageRevision.DoesNotExist:
return {}


def get_latest_revision(page):
return SurveyRevision(PageRevision.objects.filter(page=page).latest("created_at"))


def identify_problems(survey, revision):
return {
problem
for p in [
field_ids_mismatch,
id_mismatch,
labels_mismatch,
no_questions,
no_revision_questions,
]
if (problem := p(survey, revision))
}


def no_questions(survey, revision):
return "no_qs" if len(survey.get_form_fields()) < 1 else None


def no_revision_questions(survey, revision):
return "no_rev_qs" if len(revision.fields) < 1 else None


def id_mismatch(survey, revision):
return "id" if revision.pk != survey.id else None


def field_ids_mismatch(survey, revision):
survey_field_ids = {field.id for field in survey.get_form_fields()}
revision_field_ids = {field.pk for field in revision.fields if field.pk}

return "field_ids" if survey_field_ids != revision_field_ids else None


def labels_mismatch(survey, revision):
survey_field_labels = {field.label for field in survey.get_form_fields()}
revision_field_labels = {field.label for field in revision.fields if field.label}

return "labels" if survey_field_labels != revision_field_labels else None


def fix(problems_report):
for entry in problems_report:
if {"id", "field_ids"}.issubset(entry.problems):
print(f"Revision update required, survey={entry.survey}")
entry.survey.save_revision(log_action=True)
elif "no_qs" in entry.problems:
print(
f"Restore fields from previous revision required, survey={entry.survey}"
)
for field in find_first_restorable_revision(entry.survey).fields:
create_field(entry.survey, field._raw).save()
latest_revision = entry.survey.save_revision(log_action=True)
latest_revision.publish()
else:
print(f"No action taken, survey={entry.survey}")


def find_first_restorable_revision(page):
return next(
sr
for revision in PageRevision.objects.filter(page=page).order_by("-created_at")
if is_restorable_v1(((sr := SurveyRevision(revision))), page)
)


def is_restorable_v1(revision, page):
"""Identifies a v1 PageRevision that can be used to restore a v2 Survey.
PageRevisions from v1 reference primary keys that do not match the v2 database
because revisions were copied verbatim from v1. It is possible, though unlikely
that the primary keys might be the same across v1 and v2. The alternative would be
to read the surveys directly from the v1 database. This method was chosen for the
sake of convenience.
"""
return revision.pk != page.id and len(revision.fields) > 0


def create_field(survey, data):
return SurveyFormField(
admin_label=data.get("admin_label"),
choices="|".join(
choice.strip() for choice in data.get("choices", "").split(",")
),
default_value=data.get("default_value"),
field_type=(
"positivenumber"
if (ftype := data.get("field_type")) == "positive_number"
else ftype
),
help_text=data.get("help_text"),
label=data.get("label"),
page=survey,
page_break=data.get("page_break"),
required=data.get("required"),
skip_logic=[
create_answer_option(item)
for item in json.loads(data.get("skip_logic", "[]"))
],
sort_order=data.get("sort_order"),
)


def create_answer_option(item):
value = item.get("value", {})

return (
"skip_logic",
{
"choice": value.get("choice"),
"skip_logic": value.get("skip_logic"),
"question": value.get("question"),
},
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"admin_label": "Learn anything?",
"choices": "All of this information is new to me, Most of this information is new to me, A little bit of this information is new to me, None of of this information is new to me",
"default_value": "",
"field_type": "radio",
"help_text": "Please choose one answer only",
"label": "Have you learnt anything new from this All In information?",
"page": 188,
"page_break": false,
"pk": 38,
"required": true,
"skip_logic": "[{\"type\": \"skip_logic\", \"value\": {\"choice\": \"Yes\", \"skip_logic\": \"next\", \"survey\": null, \"question\": null}, \"id\": \"70666059-b8d3-493c-8259-5c58208978d9\"}, {\"type\": \"skip_logic\", \"value\": {\"choice\": \"No\", \"skip_logic\": \"next\", \"survey\": null, \"question\": null}, \"id\": \"5a719c49-1b22-40c9-ae2e-5c6440c1d0f9\"}]",
"sort_order": 0
}
34 changes: 34 additions & 0 deletions iogt_content_migration/tests/test_fix_surveys.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import json
from pathlib import Path

from django.test import TestCase
from iogt_content_migration.management.commands.fix_surveys import create_field
from questionnaires.factories import SurveyFactory


class TestFixSurveys(TestCase):
def test_create_form_field(self):
survey = SurveyFactory()

with open(open_resource("v1_page_revision_survey_form_field.json")) as fp:
field = create_field(survey, json.load(fp))

self.assertEqual(len(field.skip_logic), 2)

option = field.skip_logic[0].value
self.assertEqual(option["choice"], "Yes")
self.assertEqual(option["skip_logic"], "next")
self.assertIsNone(option["question"])

self.assertEqual(
field.choices,
"All of this information is new to me|"
"Most of this information is new to me|"
"A little bit of this information is new to me|"
"None of of this information is new to me",
)
self.assertEqual(field.page.id, survey.id)


def open_resource(filename):
return Path(__file__).parent / "resources" / filename

0 comments on commit 8dbdd7d

Please sign in to comment.