Skip to content

Commit

Permalink
Configured pk must be unique, fail otherwise. When no pk configured, …
Browse files Browse the repository at this point in the history
…provide option to drop duplicates (equivalent geoms), otherwise just warn
  • Loading branch information
smnorris committed Sep 26, 2024
1 parent 163859c commit 1f7b77c
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 39 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ test = [
]

[project.scripts]
fit_downloader = "fit_downloader.cli:cli"
fit_downloader = "fit_opendatadownloader.cli:cli"

[project.urls]
Homepage = "https://github.com/bcgov/fit_opendatadownloader"
Expand Down
79 changes: 45 additions & 34 deletions src/fit_opendatadownloader/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,19 +79,26 @@ def download(self):
return df


def clean(df, fields, primary_key, precision=0.01, fdl_primary_key="fdl_load_id"):
def clean(
df,
fields,
primary_key=None,
precision=0.01,
fdl_primary_key="fdl_load_id",
drop_geom_duplicates=False,
):
"""
Standardize a geodataframe, confirming:
- geometries are BC Albers
- geometries are of supported spatial type
- if multipart geometries are present, ensure *all* geometries are multipart
- if multipart geometries are present, promote *all* geometries to multipart
- clean field names (lowercase, no special characters or spaces)
- remove any fields not included in config fields key
- drop duplicates (considering retained fields and geometries, with reduced precision if specified)
- if primary key is provided, validate it is unique and add geometry to pk if it is not
- if primary key is not provided, default to using the geometry as pk (detecting changes to attributes)
- hash the primary key into a new column, to use as key for generating diffs
- if primary key is provided, validate to ensure it is unique
- if primary key is not provided:
- - create hash of the geometry to use as primary key (defaulting to 1cm coordinate precision)
- - optionally, drop duplicate records (based on geometry hash key)
"""
# reproject to BC Albers if necessary
if df.crs != CRS.from_user_input(3005):
Expand All @@ -113,45 +120,49 @@ def clean(df, fields, primary_key, precision=0.01, fdl_primary_key="fdl_load_id"
# drop any columns not listed in config (minus geometry)
df = df[fields + ["geometry"]]

# normalize the geometries
df["geometry"] = df["geometry"].normalize()

# then add a reduced precision geometry column based on the normalized geoms
df["geometry_p"] = df["geometry"].set_precision(precision, mode="pointwise")

# drop duplicates based on all retained fields and this geometry
if len(df) != len(df.drop_duplicates(subset=fields + ["geometry_p"])):
n_dropped = len(df) - len(df.drop_duplicates(subset=fields + ["geometry_p"]))
df = df.drop_duplicates(subset=fields + ["geometry_p"])
LOG.warning(
f"Dropped {n_dropped} duplicate rows (equivalent attributes and geometries)"
)

# check and fix spatial types (working with original geometries)
df = standardize_spatial_types(df)

# process primary keys, adding geometry if they are not unique
pks = None
# Validate primary keys, they must be unique
pks = []
if primary_key:
# swap provided pk names to cleaned column names
pks = [cleaned_column_map[k] for k in primary_key]
# if pk values are not unique, add the reduced precision geometry to the pk
# fail if pk values are not unique
if len(df) != len(df[pks].drop_duplicates()):
pk_string = ",".join(pks)
LOG.warning(
f"Duplicate values exist for primary_key {pk_string}, adding geometries to primary key"
)
pks = pks + ["geometry_p"]
raise ValueError(f"Duplicate values exist for primary_key {pk_string}")

# if no pk provided, default to using the reduced precision geometry
else:
pks = ["geometry_p"]
# Just to keep things as simple as possible, always create a hashed key
# based on supplied primary key. This way we can always use the same column
# as pk when running the change detection.
LOG.info(
f"Adding hashed key {fdl_primary_key}, based on hash of provided primary_key {','.join(pks)}"
)
df = fcd.add_hash_key(df, fdl_primary_key, fields=pks, hash_geometry=False)
pks = [fdl_primary_key]

# add pk based on hash
df = fcd.add_synthetic_primary_key(df, columns=pks, new_column=fdl_primary_key)
# if no primary key provided, just use the geometry
else:
LOG.info(
f"Adding hashed key {fdl_primary_key}, based on hash of geometry {','.join(pks)}"
)
df = fcd.add_hash_key(
df, fdl_primary_key, hash_geometry=True, precision=precision
)
pks = [fdl_primary_key]

# duplicates could be present if using geometry hash as pk
# report on duplicates and drop (if specified)
n_duplicates = len(df.drop_duplicates(subset=pks))
if n_duplicates > 0:
LOG.warning(f"{n_duplicates} duplicates are present in data")
if drop_geom_duplicates:
df = df.drop_duplicates(subset=pks)
LOG.warning(
f"Dropped {n_duplicates} duplicate rows (equivalent geometries)"
)

# drop the reduced precision geometry column
df = df[[fdl_primary_key] + fields + ["geometry"]]
return df


Expand Down
9 changes: 5 additions & 4 deletions tests/test_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,9 @@ def test_clean_columns():
]
layer = fdl.parse_config(sources)[0]
df = layer.download()
df = fdl.clean(df, layer.fields, layer.primary_key, precision=0.1)
df = fdl.clean(
df, fields=layer.fields, primary_key=layer.primary_key, precision=0.1
)
assert "airport_name_" in df.columns


Expand All @@ -160,14 +162,13 @@ def test_hash_pk():
"POSTAL_CODE",
"LOCALITY",
],
"primary_key": ["SOURCE_DATA_ID"],
"schedule": "Q",
}
]
layer = fdl.parse_config(sources)[0]
df = layer.download()
df = fdl.clean(df, layer.fields, layer.primary_key, precision=0.1)
assert df["fdl_load_id"].iloc[0] == "51eac6b471a284d3341d8c0c63d0f1a286262a18"
df = fdl.clean(df, layer.fields, precision=0.1)
assert df["fdl_load_id"].iloc[0] == "597b8d8bef757cb12fec15ce027fb2c6f84775d7"


def test_mixed_types():
Expand Down

0 comments on commit 1f7b77c

Please sign in to comment.