Skip to content

Commit

Permalink
make data sources more flexible, add tiles script and download suppor…
Browse files Browse the repository at this point in the history
…ting datasets
  • Loading branch information
smnorris committed Aug 4, 2024
1 parent 0b9986e commit 0da5bb8
Show file tree
Hide file tree
Showing 6 changed files with 475 additions and 483 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

[![Lifecycle:Experimental](https://img.shields.io/badge/Lifecycle-Experimental-339999)](https://github.com/bcgov/repomountie/blob/master/doc/lifecycle-badges.md)

Extract data from provincial inventories to quantify how much of British Columbia's provincially administered forested land is under some type of forest harvest management regime.
Quantify how much of British Columbia's provincially administered forested land is under some type of forest harvest management regime.
229 changes: 123 additions & 106 deletions harvest_restrictions.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,60 +62,75 @@ def parse_sources(sources):
return parsed


def validate_file(layer):
def validate_file(source):
"""simple validation of file based sources
- file exists
- schema is as expected
"""
source = layer["source"]
alias = layer["alias"]
query = layer["query"]

# load file
df = geopandas.read_file(
os.path.expandvars(source), layer=layer["layer"], where=query
os.path.expandvars(source["source"]),
layer=source["layer"],
where=source["query"],
)

# are expected columns present?
df.columns = [x.lower() for x in df.columns]
for col in ["primary_key", "name_column"]:
if layer[col]:
column = layer[col].lower()
if column not in df.columns:
raise ValueError(f"{alias} - {column} is not present in source")
columns = [x.lower() for x in df.columns]
if source["primary_key"]:
if source["primary_key"] not in columns:
raise ValueError(
f"Primary key - {source['primary_key']} is not present in source"
)
for column in source["field_mapper"].values():
if column.lower() not in columns:
raise ValueError(
f"Column {column} is not present in source, modify config 'field_mapper'"
)

# is there data?
alias = source["alias"]
if len(df.index) == 0:
raise ValueError(f"{alias} - no data returned for given source and query")

# presume layer is defined correctly if no errors are raised
LOG.info(f"{alias} - validates successfully")


def validate_bcgw(layer):
def validate_bcgw(source):
"""validate bcdata sources against bcdc api and wfs"""
# does source exist as written?
alias = layer["alias"]
table = layer["source"].upper()
query = layer["query"]
alias = source["alias"]
table = source["source"].upper()
query = source["query"]
if table not in bcdata.list_tables():
raise ValueError(
f"{alias} - {table} does not exist in BCGW or is not available via WFS"
)

# do specified name/pk columns exist in source?
# get columns present in source from data catalogue
table_def = bcdata.get_table_definition(table)
for col in ["primary_key", "name_column"]:
if layer[col]:
column = layer[col].upper()
if column not in [c["column_name"] for c in table_def["schema"]]:
columns = [c["column_name"] for c in table_def["schema"]]

# is primary key present?
if source["primary_key"]:
if source["primary_key"] not in columns:
raise ValueError(
f"Primary key - {source['primary_key']} is not present in source {table}"
)

# are other required columns in field mapping present?
for column in source["field_mapper"].values():
if (
column
): # allow null source columns (adds the new column, but with no values from source)
if column.upper() not in columns:
raise ValueError(
f"{alias} - {column} is not present in BCGW table {table}"
f"Column {column} is not present in source, modify config 'field_mapper'"
)

# does query return values?
if layer["query"]:
if bcdata.get_count(table, query=layer["query"]) == 0:
if source["query"]:
if bcdata.get_count(table, query=source["query"]) == 0:
raise ValueError(
f"{alias} - provided query {query} returns no data for {table}"
)
Expand Down Expand Up @@ -149,40 +164,43 @@ def validate_sources(sources, validate_data=True, alias=None):
Validate json, whether data sources exist, and assign hierarchy index
based on position in list
"""
for layer in sources:
if layer["source_type"] == "BCGW":
validate_bcgw(layer)
elif layer["source_type"] == "FILE":
validate_file(layer)
for source in sources:
if source["source_type"] == "BCGW":
validate_bcgw(source)
elif source["source_type"] == "FILE":
validate_file(source)

LOG.info("Validation successful - all layers appear valid")

# return validated (and indexed/dated) sources as ordered dictionary
return sources


def download_source(layer, out_path="data", out_table="designations_cleaned"):
"""download layer from source and save to parquet in out_path"""

table, alias = (layer["source"], layer["alias"])
def download(source):
"""download layer from source to a standardized geodataframe"""

# download WFS
if layer["source_type"] == "BCGW":
if source["source_type"] == "BCGW":
df = bcdata.get_data(
table,
source["source"],
crs="EPSG:3005",
query=layer["query"],
query=source["query"],
as_gdf=True,
lowercase=True,
)

# download file
elif layer["source_type"] == "FILE":
elif source["source_type"] == "FILE":
df = geopandas.read_file(
os.path.expandvars(layer["source"]),
layer=layer["layer"],
where=layer["query"],
os.path.expandvars(source["source"]),
layer=source["layer"],
where=source["query"],
)
if not df.crs:
raise ValueError(
"Source does not have a defined projection/coordinate reference system"
)
# reproject to BC Albers if necessary
if df.crs != CRS.from_user_input(3005):
df = df.to_crs("EPSG:3005")
# lowercasify column names
Expand All @@ -192,74 +210,46 @@ def download_source(layer, out_path="data", out_table="designations_cleaned"):
df = df.rename_geometry("geom")
df = to_multipart(df) # sources can have mixed types, just make everything multi

# add new columns, prefixing and suffixing with "__" to avoid collisions
df["__index__"] = layer["index"]
df["__designation__"] = layer["designation"]
df["__alias__"] = layer["alias"].lower()
df["__harvest_restriction__"] = layer["harvest_restriction"]
df["__og_restriction__"] = layer["og_restriction"]
df["__mining_restriction__"] = layer["mining_restriction"]
# load pk/name if present, otherwise set to empty string
if layer["primary_key"]:
df["__primary_key__"] = layer["primary_key"]
# standardize columns, adding data as required
df["__index__"] = source["index"]
df["__description__"] = source["description"]
df["__alias__"] = source["alias"].lower()
if source["primary_key"]:
df["__primary_key__"] = source["primary_key"]
else:
df["__primary_key__"] = ""
if layer["name_column"]:
df["__name__"] = df[layer["name_column"].lower()]
else:
df["__name__"] = ""

# retain only columns of interest
df = df[
[
"__index__",
"__designation__",
"__alias__",
"__harvest_restriction__",
"__og_restriction__",
"__mining_restriction__",
"__primary_key__",
"__name__",
"geom",
]
]
# rename remaining columns
df = df.rename(
columns={
"__index__": "index",
"__designation__": "designation",
"__alias__": "alias",
"__harvest_restriction__": "harvest_restriction",
"__og_restriction__": "og_restriction",
"__mining_restriction__": "mining_restriction",
"__primary_key__": "source_primary_key",
"__name__": "source_name",
}

# rename columns that we want to retain
for key, value in source["field_mapper"].items():
if value:
df["__" + key + "__"] = df[
value.lower()
] # all incoming data is already lowercasified
else:
df["__" + key + "__"] = None

# add additional constant data
if source["data"]:
for key, value in source["data"].items():
df["__" + key + "__"] = value

# retain only columns that have just been added
columns = (
["index", "description", "alias", "primary_key"]
+ list(source["field_mapper"])
+ list(source["data"])
)
df = df[["__" + c + "__" for c in columns] + ["geom"]]

# dump to file if out_path specified
if out_path:
out_file = os.path.join(
out_path,
(
"rr_"
+ str(layer["index"]).zfill(2)
+ "_"
+ layer["alias"].lower()
+ ".parquet"
),
)
LOG.info(f"Writing {alias} to {out_file}")
df.to_parquet(out_file)
# strip the __ prefix/suffix
df = df.rename(columns={"__" + c + "__": c for c in columns})

# load to postgres, writing everything to the same initial table
LOG.info(f"Writing {alias} to postgres")
df.to_postgis(out_table, DB, if_exists="append")
return df


def process(out_path):
"""clean and overlay input data, dump to file if specified"""
DB.execute("create table designations_cleaned ")
# def process(out_path):
# """clean and overlay input data, dump to file if specified"""
# DB.execute("create table designations_cleaned ")


@click.group()
Expand Down Expand Up @@ -317,7 +307,7 @@ def validate(alias, sources_file, verbose, quiet):
)
@verbose_opt
@quiet_opt
def download(alias, sources_file, out_path, no_validate, verbose, quiet):
def setup(alias, sources_file, out_path, no_validate, verbose, quiet):
configure_logging((verbose - quiet))

# load sources file
Expand All @@ -334,10 +324,37 @@ def download(alias, sources_file, out_path, no_validate, verbose, quiet):
sources = validate_sources(sources)

# download each data source
for layer in sources:
download_source(layer, out_path)

# download supporting datasets
for source in sources:
df = download(source)

# load to postgres, writing everything to the same initial table
LOG.info(f"Writing {source['alias']} to postgres")
df.to_postgis("restrictions_source", DB, if_exists="append")

# dump to file if out_path specified
if out_path:
out_file = os.path.join(
out_path,
(
"rr_"
+ str(source["index"]).zfill(2)
+ "_"
+ source["alias"].lower()
+ ".parquet"
),
)
LOG.info(f"Writing {alias} to {out_file}")
df.to_parquet(out_file)

# download additional supporting datasets
if not alias:
for table in [
"WHSE_BASEMAPPING.BCGS_20K_GRID",
"WHSE_WILDLIFE_MANAGEMENT.CRIMS_MARINE_ECOSECTION",
"WHSE_LEGAL_ADMIN_BOUNDARIES.ABMS_PROVINCE_SP",
"WHSE_BASEMAPPING.NTS_250K_GRID",
]:
bcdata.bc2pg(table, os.environ["DATABASE_URL"])


if __name__ == "__main__":
Expand Down
Loading

0 comments on commit 0da5bb8

Please sign in to comment.