Skip to content

Commit

Permalink
feat: flatgeobuf data extracts on frontend & backend (#1241)
Browse files Browse the repository at this point in the history
* refactor: add data_extract_url to ProjectOut schema

* feat: load remote flatgeobuf data extracts from S3

* fix(frontend): correctly load nested fgb GeometryCollection type

* feat: read/write flatgeobuf, split geoms by task in database

* feat: split fgb extract by t
ask, generate geojson form media

* refactor: deletion comment for frontend files not required

* build: remove features table from db

* feat: allow uploading of custom data extracts in fgb format

* test: update tests to use flatgeobuf data extracts

* refactor: rename var for clarity

* ci: minify backend test data + ignore from prettier
  • Loading branch information
spwoodcock authored Feb 22, 2024
1 parent d13579c commit f54b30a
Show file tree
Hide file tree
Showing 37 changed files with 812 additions and 1,077 deletions.
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ repos:
"!CONTRIBUTING.md",
"!LICENSE.md",
"!src/frontend/pnpm-lock.yaml",
"!src/backend/tests/test_data/**",
]

# # Lint: Dockerfile (disabled until binary is bundled)
Expand Down
26 changes: 18 additions & 8 deletions src/backend/app/central/central_crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
"""Logic for interaction with ODK Central & data."""

import os
from io import BytesIO
from pathlib import Path
from typing import Optional
from xml.etree import ElementTree

# import osm_fieldwork
# Qr code imports
from fastapi import HTTPException
from fastapi.responses import JSONResponse
from loguru import logger as log
Expand Down Expand Up @@ -210,11 +210,11 @@ def upload_xform_media(

def create_odk_xform(
project_id: int,
xform_id: str,
xform_name: str,
filespec: str,
feature_geojson: BytesIO,
odk_credentials: Optional[project_schemas.ODKCentralDecrypted] = None,
create_draft: bool = False,
upload_media=True,
convert_to_draft_when_publishing=True,
):
"""Create an XForm on a remote ODK Central server."""
Expand All @@ -236,16 +236,26 @@ def create_odk_xform(
status_code=500, detail={"message": "Connection failed to odk central"}
) from e

result = xform.createForm(project_id, xform_id, filespec, create_draft)
result = xform.createForm(project_id, xform_name, filespec, create_draft)

if result != 200 and result != 409:
return result
data = f"/tmp/{title}.geojson"

# TODO refactor osm_fieldwork.OdkCentral.OdkForm.uploadMedia
# to accept passing a bytesio object and update
geojson_file = Path(f"/tmp/{title}.geojson")
with open(geojson_file, "w") as f:
f.write(feature_geojson.getvalue().decode("utf-8"))

# This modifies an existing published XForm to be in draft mode.
# An XForm must be in draft mode to upload an attachment.
if upload_media:
xform.uploadMedia(project_id, title, data, convert_to_draft_when_publishing)
# Upload the geojson of features to be modified
xform.uploadMedia(
project_id, title, str(geojson_file), convert_to_draft_when_publishing
)

# Delete temp geojson file
geojson_file.unlink(missing_ok=True)

result = xform.publishForm(project_id, title)
return result
Expand Down
28 changes: 1 addition & 27 deletions src/backend/app/db/db_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
desc,
)
from sqlalchemy.dialects.postgresql import ARRAY as PostgreSQLArray # noqa: N811
from sqlalchemy.dialects.postgresql import JSONB, TSVECTOR
from sqlalchemy.dialects.postgresql import TSVECTOR
from sqlalchemy.orm import (
# declarative_base,
backref,
Expand Down Expand Up @@ -691,32 +691,6 @@ class DbLicense(Base):
) # Many to Many relationship


class DbFeatures(Base):
"""Features extracted from osm data."""

__tablename__ = "features"

id = cast(int, Column(Integer, primary_key=True))
project_id = cast(int, Column(Integer, ForeignKey("projects.id")))
project = cast(DbProject, relationship(DbProject, backref="features"))

category_title = cast(
str, Column(String, ForeignKey("xlsforms.title", name="fk_xform"))
)
category = cast(DbXForm, relationship(DbXForm))
task_id = cast(int, Column(Integer, nullable=True))
properties = cast(dict, Column(JSONB))
geometry = cast(WKBElement, Column(Geometry(geometry_type="GEOMETRY", srid=4326)))

__table_args__ = (
ForeignKeyConstraint(
[task_id, project_id], ["tasks.id", "tasks.project_id"], name="fk_tasks"
),
Index("idx_features_composite", "task_id", "project_id"),
{},
)


class BackgroundTasks(Base):
"""Table managing long running background tasks."""

Expand Down
195 changes: 136 additions & 59 deletions src/backend/app/db/postgis_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,74 +116,51 @@ async def geojson_to_flatgeobuf(
) -> Optional[bytes]:
"""From a given FeatureCollection, return a memory flatgeobuf obj.
NOTE this generate an fgb with string timestamps, not datetime.
NOTE ogr2ogr would generate datetime, but parsing does not seem to work.
Args:
db (Session): SQLAlchemy db session.
geojson (geojson.FeatureCollection): a FeatureCollection object.
Returns:
flatgeobuf (bytes): a Python bytes representation of a flatgeobuf file.
"""
# FIXME make this with with properties / tags
# FIXME this is important
# FIXME but difficult to guarantee users upload geojson
# FIXME With required properties included
# sql = """
# DROP TABLE IF EXISTS public.temp_features CASCADE;

# CREATE TABLE IF NOT EXISTS public.temp_features(
# geom geometry,
# osm_id integer,
# changeset integer,
# timestamp timestamp
# );

# WITH data AS (SELECT CAST(:geojson AS json) AS fc)
# INSERT INTO public.temp_features (geom, osm_id, changeset, timestamp)
# SELECT
# ST_SetSRID(ST_GeomFromGeoJSON(feat->>'geometry'), 4326) AS geom,
# COALESCE((feat->'properties'->>'osm_id')::integer, -1) AS osm_id,
# COALESCE((feat->'properties'->>'changeset')::integer, -1) AS changeset,
# CASE
# WHEN feat->'properties'->>'timestamp' IS NOT NULL
# THEN (feat->'properties'->>'timestamp')::timestamp
# ELSE NULL
# END AS timestamp
# FROM (
# SELECT json_array_elements(fc->'features') AS feat
# FROM data
# ) AS f;

# -- Second param = generate with spatial index
# SELECT ST_AsFlatGeobuf(geoms, true)
# FROM (SELECT * FROM public.temp_features) AS geoms;
# """
sql = """
DROP TABLE IF EXISTS public.temp_features CASCADE;
CREATE TABLE IF NOT EXISTS public.temp_features(
geom geometry
DROP TABLE IF EXISTS temp_features CASCADE;
-- Wrap geometries in GeometryCollection
CREATE TEMP TABLE IF NOT EXISTS temp_features(
geom geometry(GeometryCollection, 4326),
osm_id integer,
tags text,
version integer,
changeset integer,
timestamp text
);
WITH data AS (SELECT CAST(:geojson AS json) AS fc)
INSERT INTO public.temp_features (geom)
INSERT INTO temp_features
(geom, osm_id, tags, version, changeset, timestamp)
SELECT
ST_SetSRID(ST_GeomFromGeoJSON(feat->>'geometry'), 4326) AS geom
FROM (
SELECT json_array_elements(fc->'features') AS feat
FROM data
) AS f;
SELECT ST_AsFlatGeobuf(fgb_data, true)
FROM (SELECT * FROM public.temp_features as geoms) AS fgb_data;
ST_ForceCollection(ST_GeomFromGeoJSON(feat->>'geometry')) AS geom,
(feat->'properties'->>'osm_id')::integer as osm_id,
(feat->'properties'->>'tags')::text as tags,
(feat->'properties'->>'version')::integer as version,
(feat->'properties'->>'changeset')::integer as changeset,
(feat->'properties'->>'timestamp')::text as timestamp
FROM json_array_elements((SELECT fc->'features' FROM data)) AS f(feat);
-- Second param = generate with spatial index
SELECT ST_AsFlatGeobuf(geoms, true)
FROM (SELECT * FROM temp_features) AS geoms;
"""

# Run the SQL
result = db.execute(text(sql), {"geojson": json.dumps(geojson)})
# Get a memoryview object, then extract to Bytes
flatgeobuf = result.first()

# Cleanup table
# db.execute(text("DROP TABLE IF EXISTS public.temp_features CASCADE;"))

if flatgeobuf:
return flatgeobuf[0].tobytes()

Expand All @@ -196,14 +173,15 @@ async def flatgeobuf_to_geojson(
) -> Optional[geojson.FeatureCollection]:
"""Converts FlatGeobuf data to GeoJSON.
Extracts single geometries from wrapped GeometryCollection if used.
Args:
db (Session): SQLAlchemy db session.
flatgeobuf (bytes): FlatGeobuf data in bytes format.
Returns:
geojson.FeatureCollection: A FeatureCollection object.
"""
# FIXME can we use SELECT * to extract all fields into geojson properties?
sql = text(
"""
DROP TABLE IF EXISTS public.temp_fgb CASCADE;
Expand All @@ -217,24 +195,23 @@ async def flatgeobuf_to_geojson(
FROM (
SELECT jsonb_build_object(
'type', 'Feature',
'geometry', ST_AsGeoJSON(fgb_data.geom)::jsonb,
'geometry', ST_AsGeoJSON(ST_GeometryN(fgb_data.geom, 1))::jsonb,
'properties', jsonb_build_object(
'osm_id', fgb_data.osm_id,
'tags', fgb_data.tags,
'version', fgb_data.version,
'changeset', fgb_data.changeset,
'timestamp', fgb_data.timestamp
)::jsonb
) AS feature
FROM (
SELECT
geom,
NULL as osm_id,
NULL as tags,
NULL as version,
NULL as changeset,
NULL as timestamp
osm_id,
tags,
version,
changeset,
timestamp
FROM ST_FromFlatGeobuf(null::temp_fgb, :fgb_bytes)
) AS fgb_data
) AS features;
Expand All @@ -247,7 +224,8 @@ async def flatgeobuf_to_geojson(
except ProgrammingError as e:
log.error(e)
log.error(
"Attempted flatgeobuf --> geojson conversion, but duplicate column found"
"Attempted flatgeobuf --> geojson conversion failed. "
"Perhaps there is a duplicate 'id' column?"
)
return None

Expand All @@ -257,6 +235,105 @@ async def flatgeobuf_to_geojson(
return None


async def split_geojson_by_task_areas(
db: Session,
featcol: geojson.FeatureCollection,
project_id: int,
) -> Optional[dict[int, geojson.FeatureCollection]]:
"""Split GeoJSON into tagged task area GeoJSONs.
Args:
db (Session): SQLAlchemy db session.
featcol (bytes): Data extract feature collection.
project_id (int): The project ID for associated tasks.
Returns:
dict[int, geojson.FeatureCollection]: {task_id: FeatureCollection} mapping.
"""
sql = text(
"""
-- Drop table if already exists
DROP TABLE IF EXISTS temp_features CASCADE;
-- Create a temporary table to store the parsed GeoJSON features
CREATE TEMP TABLE temp_features (
id SERIAL PRIMARY KEY,
geometry GEOMETRY,
properties JSONB
);
-- Insert parsed geometries and properties into the temporary table
INSERT INTO temp_features (geometry, properties)
SELECT
ST_SetSRID(ST_GeomFromGeoJSON(feature->>'geometry'), 4326) AS geometry,
jsonb_set(
jsonb_set(feature->'properties', '{task_id}', to_jsonb(tasks.id), true),
'{project_id}', to_jsonb(tasks.project_id), true
) AS properties
FROM (
SELECT jsonb_array_elements(CAST(:geojson_featcol AS jsonb)->'features')
AS feature
) AS features
CROSS JOIN tasks
WHERE tasks.project_id = :project_id;
-- Retrieve task outlines based on the provided project_id
WITH task_outlines AS (
SELECT id, outline
FROM tasks
WHERE project_id = :project_id
)
SELECT
task_outlines.id AS task_id,
jsonb_build_object(
'type', 'FeatureCollection',
'features', jsonb_agg(features.feature)
) AS task_features
FROM
task_outlines
LEFT JOIN LATERAL (
-- Construct a feature collection with geometries per task area
SELECT
jsonb_build_object(
'type', 'Feature',
'geometry', ST_AsGeoJSON(temp_features.geometry)::jsonb,
'properties', temp_features.properties
) AS feature
FROM
temp_features
WHERE
ST_Within(temp_features.geometry, task_outlines.outline)
) AS features ON true
GROUP BY
task_outlines.id;
"""
)

try:
result = db.execute(
sql,
{
"geojson_featcol": json.dumps(featcol),
"project_id": project_id,
},
)
feature_collections = result.all()

except ProgrammingError as e:
log.error(e)
log.error("Attempted geojson task splitting failed")
return None

if feature_collections:
task_geojson_dict = {
record[0]: geojson.loads(json.dumps(record[1]))
for record in feature_collections
}
return task_geojson_dict

return None


def parse_and_filter_geojson(
geojson_str: str, filter: bool = True
) -> Optional[geojson.FeatureCollection]:
Expand Down
Loading

0 comments on commit f54b30a

Please sign in to comment.