diff --git a/hub/data_imports/geocoding_config.py b/hub/data_imports/geocoding_config.py index ef26d48dd..ae4e4f633 100644 --- a/hub/data_imports/geocoding_config.py +++ b/hub/data_imports/geocoding_config.py @@ -10,6 +10,7 @@ from asgiref.sync import sync_to_async from hub.data_imports.utils import get_update_data +from hub.graphql.dataloaders import FieldDataLoaderFactory from utils import google_maps, mapit_types from utils.findthatpostcode import ( get_example_postcode_from_area_gss, @@ -31,6 +32,18 @@ def find_config_item(source: "ExternalDataSource", key: str, value, default=None ) +class GeocoderContext: + """ + Context class to support DataLoader creation and re-use + (existing dataloaders are stored here, so each record can + re-use a previously created loader. This is a necessary + component for dataloader batching to work). + """ + + def __init__(self): + self.dataloaders = {} + + # enum of geocoders: postcodes_io, mapbox, google class Geocoder(Enum): POSTCODES_IO = "postcodes_io" @@ -38,11 +51,13 @@ class Geocoder(Enum): MAPBOX = "mapbox" GOOGLE = "google" AREA_GEOCODER_V2 = "AREA_GEOCODER_V2" + AREA_CODE_GEOCODER_V2 = "AREA_CODE_GEOCODER_V2" ADDRESS_GEOCODER_V2 = "ADDRESS_GEOCODER_V2" COORDINATE_GEOCODER_V1 = "COORDINATE_GEOCODER_V1" LATEST_AREA_GEOCODER = Geocoder.AREA_GEOCODER_V2 +LATEST_AREA_CODE_GEOCODER = Geocoder.AREA_CODE_GEOCODER_V2 LATEST_ADDRESS_GEOCODER = Geocoder.ADDRESS_GEOCODER_V2 LATEST_COORDINATE_GEOCODER = Geocoder.COORDINATE_GEOCODER_V1 @@ -80,6 +95,7 @@ async def import_record( source: "ExternalDataSource", data_type: "DataType", loaders: "Loaders", + geocoder_context: GeocoderContext, ): from hub.models import ExternalDataSource, GenericData @@ -93,8 +109,13 @@ async def import_record( geocoding_config_type = source.geocoding_config.get("type", None) importer_fn = None if geocoding_config_type == ExternalDataSource.GeographyTypes.AREA: - geocoder = LATEST_AREA_GEOCODER - importer_fn = import_area_data + components = source.geocoding_config.get("components", []) + if len(components) == 1 and components[0].get("type") == "area_code": + geocoder = LATEST_AREA_CODE_GEOCODER + importer_fn = import_area_code_data + else: + geocoder = LATEST_AREA_GEOCODER + importer_fn = import_area_data elif geocoding_config_type == ExternalDataSource.GeographyTypes.ADDRESS: geocoder = LATEST_ADDRESS_GEOCODER importer_fn = import_address_data @@ -107,8 +128,7 @@ async def import_record( # check if geocoding_config and dependent fields are the same; if so, skip geocoding try: - generic_data = await GenericData.objects.aget(data_type=data_type, data=id) - + generic_data = await loaders["generic_data"].load(id) # First check if the configs are the same if ( generic_data is not None @@ -159,6 +179,94 @@ async def import_record( data_type=data_type, loaders=loaders, update_data=update_data, + geocoder_context=geocoder_context, + ) + + +async def import_area_code_data( + record, + source: "ExternalDataSource", + data_type: "DataType", + loaders: "Loaders", + update_data: dict, + geocoder_context: GeocoderContext, +): + from hub.models import Area, GenericData + + update_data["geocoder"] = LATEST_AREA_CODE_GEOCODER.value + + area = None + geocoding_data = {} + steps = [] + + components = source.geocoding_config.get("components", []) + if not components: + return + + item = components[0] + + literal_lih_area_type__code = item.get("metadata", {}).get( + "lih_area_type__code", None + ) + literal_mapit_type = item.get("metadata", {}).get("mapit_type", None) + area_types = literal_lih_area_type__code or literal_mapit_type + literal_area_field = item.get("field", None) + area_code = str(source.get_record_field(record, literal_area_field)) + + if area_types is None or literal_area_field is None or area_code is None: + return + + parsed_area_types = [str(s).upper() for s in ensure_list(area_types)] + + area_filters = {} + if literal_lih_area_type__code: + area_filters["area_type__code"] = literal_lih_area_type__code + if literal_mapit_type: + area_filters["mapit_type"] = literal_mapit_type + + AreaLoader = FieldDataLoaderFactory.get_loader_class( + Area, field="gss", filters=area_filters, select_related=["area_type"] + ) + + area = await AreaLoader(geocoder_context).load(area_code) + if area is None: + return + + step = { + "type": "area_code_matching", + "area_types": parsed_area_types, + "result": "failed" if area is None else "success", + "search_term": area_code, + "data": ( + { + "centroid": area.polygon.centroid.json, + "name": area.name, + "id": area.id, + "gss": area.gss, + } + if area is not None + else None + ), + } + steps.append(step) + + geocoding_data["area_fields"] = geocoding_data.get("area_fields", {}) + geocoding_data["area_fields"][area.area_type.code] = area.gss + update_data["geocode_data"].update({"data": geocoding_data}) + if area is not None: + postcode_data = await get_postcode_data_for_area(area, loaders, steps) + update_data["postcode_data"] = postcode_data + update_data["area"] = area + update_data["point"] = area.point + else: + # Reset geocoding data + update_data["postcode_data"] = None + + # Update the geocode data regardless, for debugging purposes + update_data["geocode_data"].update({"steps": steps}) + + await GenericData.objects.aupdate_or_create( + data_type=data_type, data=source.get_record_id(record), defaults=update_data ) @@ -168,6 +276,7 @@ async def import_area_data( data_type: "DataType", loaders: "Loaders", update_data: dict, + geocoder_context: GeocoderContext, ): from hub.models import Area, GenericData @@ -419,6 +528,7 @@ async def import_address_data( data_type: "DataType", loaders: "Loaders", update_data: dict, + geocoder_context: GeocoderContext, ): """ Converts a record fetched from the API into @@ -580,6 +690,7 @@ async def import_coordinate_data( data_type: "DataType", loaders: "Loaders", update_data: dict, + geocoder_context: GeocoderContext, ): from hub.models import GenericData diff --git a/hub/graphql/dataloaders.py b/hub/graphql/dataloaders.py index 185f029cc..2fd5fab91 100644 --- a/hub/graphql/dataloaders.py +++ b/hub/graphql/dataloaders.py @@ -20,7 +20,8 @@ class BasicFieldDataLoader(dataloaders.BaseDjangoModelDataLoader): field: str - filters: dict = {} + filters: dict + select_related: list @classmethod def queryset(cls, keys: list[str]): @@ -28,7 +29,7 @@ def queryset(cls, keys: list[str]): return [] return cls.model.objects.filter( **{f"{cls.field}__in": set(keys)}, **cls.filters - ) + ).select_related(*cls.select_related) @classmethod @sync_to_async @@ -67,27 +68,42 @@ class FieldDataLoaderFactory(factories.BaseDjangoModelDataLoaderFactory): @classmethod def get_loader_key( - cls, model: Type["DjangoModel"], field: str, filters: dict = {}, **kwargs + cls, + model: Type["DjangoModel"], + field: str, + filters: dict = {}, + select_related: list = [], + **kwargs, ): - return model, field, json.dumps(filters) + return model, field, json.dumps(filters), json.dumps(select_related) @classmethod def get_loader_class_kwargs( - cls, model: Type["DjangoModel"], field: str, filters: dict = {}, **kwargs + cls, + model: Type["DjangoModel"], + field: str, + filters: dict = {}, + select_related: list = [], + **kwargs, ): - return {"model": model, "field": field, "filters": filters} + return { + "model": model, + "field": field, + "filters": filters, + "select_related": select_related, + } @classmethod def as_resolver( - cls, field: str, filters: dict = {} + cls, field: str, filters: dict = {}, select_related: list = [] ) -> Callable[["DjangoModel", Info], Coroutine]: async def resolver( root: "DjangoModel", info: "Info" ): # beware, first argument needs to be called 'root' field_data: "StrawberryDjangoField" = info._field - return await cls.get_loader_class(field_data.django_model, field, filters)( - context=info.context - ).load(getattr(root, field)) + return await cls.get_loader_class( + field_data.django_model, field, filters, select_related + )(context=info.context).load(getattr(root, field)) return resolver diff --git a/hub/graphql/types/model_types.py b/hub/graphql/types/model_types.py index 7d4664d51..5e7ec388c 100644 --- a/hub/graphql/types/model_types.py +++ b/hub/graphql/types/model_types.py @@ -45,10 +45,7 @@ from hub.graphql.types.postcodes import PostcodesIOResult from hub.graphql.utils import attr_field, dict_key_field, fn_field from hub.management.commands.import_mps import party_shades -from utils.geo_reference import ( - AnalyticalAreaType, - area_to_postcode_io_key, -) +from utils.geo_reference import AnalyticalAreaType, area_to_postcode_io_key from utils.postcode import get_postcode_data_for_gss from utils.statistics import ( attempt_interpret_series_as_float, diff --git a/hub/models.py b/hub/models.py index 9fd1c73d5..8911a27e1 100644 --- a/hub/models.py +++ b/hub/models.py @@ -1750,11 +1750,14 @@ async def import_many(self, members: list): ) loaders = await self.get_loaders() + geocoder_context = geocoding_config.GeocoderContext() if self.uses_valid_geocoding_config(): await asyncio.gather( *[ - geocoding_config.import_record(record, self, data_type, loaders) + geocoding_config.import_record( + record, self, data_type, loaders, geocoder_context + ) for record in data ] ) @@ -2044,6 +2047,25 @@ def get_import_data(self, **kwargs): data_type__data_set__external_data_source_id=self.id ) + async def imported_data_loader(self, record_ids): + """ + A dataloader function for getting already-imported GenericData + for a given record ID. This ID should be the result of calling + get_record_id(record) on the source data (i.e. the record that + comes from the 3rd party data source) – NOT the GenericData.id. + """ + results = GenericData.objects.filter( + data_type__data_set__external_data_source_id=self.id, data__in=record_ids + ) + results = await sync_to_async(list)(results) + return [ + next( + (result for result in results if result.data == id), + None, + ) + for id in record_ids + ] + def get_analytics_queryset(self, **kwargs): return self.get_import_data() @@ -2144,6 +2166,7 @@ async def get_loaders(self) -> Loaders: postcodesIOFromPoint=DataLoader(load_fn=get_bulk_postcode_geo_from_coords), fetch_record=DataLoader(load_fn=self.fetch_many_loader, cache=False), source_loaders=await self.get_source_loaders(), + generic_data=DataLoader(load_fn=self.imported_data_loader), ) return loaders diff --git a/hub/tests/fixtures/geocoding_cases.py b/hub/tests/fixtures/geocoding_cases.py index b39a0fb44..44918aef5 100644 --- a/hub/tests/fixtures/geocoding_cases.py +++ b/hub/tests/fixtures/geocoding_cases.py @@ -293,3 +293,31 @@ "expected_area_gss": "E05014930", }, ] + + +area_code_geocoding_cases = [ + { + "id": "1", + "ward": "E05000993", + "expected_area_type_code": "WD23", + "expected_area_gss": "E05000993", + }, + { + "id": "2", + "ward": "E05015081", + "expected_area_type_code": "WD23", + "expected_area_gss": "E05015081", + }, + { + "id": "3", + "ward": "E05012085", + "expected_area_type_code": "WD23", + "expected_area_gss": "E05012085", + }, + { + "id": "4", + "ward": "E05007461", + "expected_area_type_code": "WD23", + "expected_area_gss": "E05007461", + }, +] diff --git a/hub/tests/test_external_data_source_parsers.py b/hub/tests/test_external_data_source_parsers.py index f59a1bcf4..963a3de77 100644 --- a/hub/tests/test_external_data_source_parsers.py +++ b/hub/tests/test_external_data_source_parsers.py @@ -9,7 +9,10 @@ from asgiref.sync import async_to_sync from hub.models import Area, DatabaseJSONSource, ExternalDataSource -from hub.tests.fixtures.geocoding_cases import geocoding_cases +from hub.tests.fixtures.geocoding_cases import ( + area_code_geocoding_cases, + geocoding_cases, +) from hub.validation import validate_and_format_phone_number from utils import mapit_types @@ -584,3 +587,186 @@ def test_skipping(self): print("--Geocode data:", d.id, json.dumps(d.geocode_data, indent=4)) print("--Postcode data:", d.id, json.dumps(d.postcode_data, indent=4)) raise + + +@skipIf(ignore_geocoding_tests, "It messes up data for other tests.") +class TestAreaCodeGeocoding(TestCase): + fixture = area_code_geocoding_cases + + @classmethod + def setUpTestData(cls): + subprocess.call("bin/import_areas_into_test_db.sh") + + cls.source = DatabaseJSONSource.objects.create( + name="geo_test", + id_field="id", + data=cls.fixture.copy(), + geocoding_config={ + "type": ExternalDataSource.GeographyTypes.AREA, + "components": [ + { + "field": "ward", + "type": "area_code", + "metadata": {"lih_area_type__code": "WD23"}, + }, + ], + }, + ) + + def test_geocoding_test_rig_is_valid(self): + self.assertGreaterEqual(Area.objects.count(), 19542) + self.assertGreaterEqual( + Area.objects.filter(polygon__isnull=False).count(), 19542 + ) + self.assertGreaterEqual(Area.objects.filter(area_type__code="DIS").count(), 164) + self.assertGreaterEqual(Area.objects.filter(area_type__code="STC").count(), 218) + self.assertGreaterEqual( + Area.objects.filter(area_type__code="WD23").count(), 8000 + ) + + # re-generate GenericData records + async_to_sync(self.source.import_many)(self.source.data) + + # load up the data for tests + self.data = self.source.get_import_data() + + for d in self.data: + try: + if d.json["expected_area_gss"] is not None: + area = Area.objects.get(gss=d.json["expected_area_gss"]) + self.assertIsNotNone(area) + except Area.DoesNotExist: + pass + + def test_geocoding_matches(self): + # re-generate GenericData records + async_to_sync(self.source.import_many)(self.source.data) + + # load up the data for tests + self.data = self.source.get_import_data() + + self.assertEqual( + len(self.data), + len(self.source.data), + "All data should be imported.", + ) + + for d in self.data: + try: + try: + try: + self.assertEqual( + d.geocode_data["data"]["area_fields"][ + d.json["expected_area_type_code"] + ], + d.json["expected_area_gss"], + ) + self.assertFalse( + d.geocode_data["skipped"], "Geocoding should be done." + ) + self.assertIsNotNone(d.postcode_data) + self.assertGreaterEqual( + len(d.geocode_data["steps"]), + 1, + "Geocoding outcomes should be debuggable, for future development.", + ) + except KeyError: + raise AssertionError("Expected geocoding data was missing.") + except AssertionError as e: + print(e) + print("Geocoding failed:", d.id, json.dumps(d.json, indent=4)) + print("--Geocode data:", d.id, json.dumps(d.geocode_data, indent=4)) + print( + "--Postcode data:", d.id, json.dumps(d.postcode_data, indent=4) + ) + raise + except TypeError as e: + print(e) + print("Geocoding failed:", d.id, json.dumps(d.json, indent=4)) + print("--Geocode data:", d.id, json.dumps(d.geocode_data, indent=4)) + print("--Postcode data:", d.id, json.dumps(d.postcode_data, indent=4)) + raise + + def test_by_mapit_types(self): + """ + Geocoding should work identically on more granular mapit_types + """ + + self.source.geocoding_config = { + "type": ExternalDataSource.GeographyTypes.AREA, + "components": [ + { + "field": "ward", + "type": "area_code", + "metadata": {"mapit_type": mapit_types.MAPIT_WARD_TYPES}, + }, + ], + } + self.source.save() + + # re-generate GenericData records + async_to_sync(self.source.import_many)(self.source.data) + + # load up the data for tests + self.data = self.source.get_import_data() + + for d in self.data: + try: + try: + self.assertEqual( + d.geocode_data["data"]["area_fields"][ + d.json["expected_area_type_code"] + ], + d.json["expected_area_gss"], + ) + self.assertIsNotNone(d.postcode_data) + self.assertDictEqual( + dict(self.source.geocoding_config), + dict(d.geocode_data.get("config", {})), + "Geocoding config should be the same as the source's", + ) + self.assertFalse( + d.geocode_data["skipped"], "Geocoding should be done." + ) + except KeyError: + raise AssertionError("Expected geocoding data was missing.") + except AssertionError as e: + print(e) + print("Geocoding failed:", d.id, json.dumps(d.json, indent=4)) + print("--Geocode data:", d.id, json.dumps(d.geocode_data, indent=4)) + print("--Postcode data:", d.id, json.dumps(d.postcode_data, indent=4)) + raise + + def test_skipping(self): + """ + If all geocoding config is the same, and the data is the same too, then geocoding should be skipped + """ + # generate GenericData records — first time they should all geocode + async_to_sync(self.source.import_many)(self.source.data) + + # re-generate GenericData records — this time, they should all skip + async_to_sync(self.source.import_many)(self.source.data) + + # load up the data for tests + self.data = self.source.get_import_data() + + for d in self.data: + try: + try: + if d.json["expected_area_gss"] is not None: + self.assertTrue( + d.geocode_data["skipped"], "Geocoding should be skipped." + ) + self.assertIsNotNone(d.postcode_data) + except KeyError: + raise AssertionError("Expected geocoding data was missing.") + except AssertionError as e: + print(e) + print( + "Geocoding was repeated unecessarily:", + d.id, + json.dumps(d.json, indent=4), + ) + print("--Geocode data:", d.id, json.dumps(d.geocode_data, indent=4)) + print("--Postcode data:", d.id, json.dumps(d.postcode_data, indent=4)) + raise diff --git a/local_intelligence_hub/settings.py b/local_intelligence_hub/settings.py index 19fb191af..c4b9b84bc 100644 --- a/local_intelligence_hub/settings.py +++ b/local_intelligence_hub/settings.py @@ -443,34 +443,16 @@ }, }, "loggers": { - "procrastinate": ( - { - "level": "DEBUG", - "handlers": ["console"], - "class": "logging.StreamHandler", - "formatter": "procrastinate", - } - if ENVIRONMENT != "production" - else { - "handlers": ["truncated"], - "level": "DEBUG", - } - ), - # Silence endless waiting for job log - "procrastinate.worker": ( - { - "level": "DEBUG", - "handlers": ["console"], - "class": "logging.StreamHandler", - "formatter": "procrastinate", - } - if ENVIRONMENT != "production" - else { - "handlers": ["truncated"], - "level": "INFO", - "propagate": False, - } - ), + "procrastinate": { + "handlers": ["truncated"], + "level": "DEBUG", + }, + # Silence endless waiting for job log on prod + "procrastinate.worker.wait_for_job": { + "handlers": ["console"], + "level": "INFO" if ENVIRONMENT == "production" else "DEBUG", + "propagate": False, + }, "django": { "handlers": ["console"], "level": DJANGO_LOG_LEVEL,