diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 5724e1c..f1baeec 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -16,7 +16,7 @@ jobs: strategy: fail-fast: false matrix: - python-version: ["3.10"] + python-version: ["3.12"] steps: - name: Install system diff --git a/README.md b/README.md index bc552b6..2974a93 100644 --- a/README.md +++ b/README.md @@ -23,13 +23,19 @@ There are some additional inputs required to run the pipeline, which should be p The script also assumes you have a Postgres database with the IUCN Redlist database in it. + +## Species data acquisition + +There are two scripts for getting the species data from the Redlist. For those in the IUCN with access to the database version of the redlist, use `extract_species_data_psql.py`. + +For those outside the IUCN, there is a script called `extract_species_data_redlist.py` that gets the data via the [V4 Redlist API](https://api.iucnredlist.org). You will need an API key, which you can request via the API website [by signing up](https://api.iucnredlist.org/users/sign_up). Once you have that, you still still need to download the ranges for that taxa your interested, as those are not available from the API, so before running the script you must [go to the spacial data portal](https://www.iucnredlist.org/resources/spatial-data-download) and download the files for the TAXA you are interested in. + ## Running the pipeline There are two ways to run the pipeline. The easiest way is to use Docker if you have it available to you, as it will manage all the dependencies for you. But you can check out and run it locally if you want to also, but it requires a little more effort. ### Running with Docker - There is included a docker file, which is based on the GDAL container image, which is set up to install everything ready to use. You can build that using: ```shell @@ -64,9 +70,7 @@ With those you should set up a Python virtual environment to install all the req ```shell $ python3 -m venv ./venv $ . ./venv/bin/activate -(venv) $ gdalinfo --version -GDAL 3.11.3 "Eganville", released 2025/07/12 -(venv) $ pip install gdal[numpy]==3.11.3 +(venv) $ pip install gdal[numpy]==`gdal-config --version` ... (venv) $ pip install -r requirements.txt ``` diff --git a/prepare_species/common.py b/prepare_species/common.py new file mode 100644 index 0000000..b8bb284 --- /dev/null +++ b/prepare_species/common.py @@ -0,0 +1,229 @@ +import os +from pathlib import Path +from typing import Any, Optional + +import aoh +import geopandas as gpd +import pyproj +import shapely + +# To match the FABDEM elevation map we use +# different range min/max/separation +ELEVATION_MAX = 8580 +ELEVATION_MIN = -427 +ELEVATION_SPREAD = 12 + +COLUMNS = [ + "id_no", + "assessment_id", + "assessment_year", + "season", + "systems", + "elevation_lower", + "elevation_upper", + "full_habitat_code", + "scientific_name", + "family_name", + "class_name", + "threats", + "category", + "category_weight", + "geometry" +] + +# From Muir et al: For each species, a global STAR threat abatement (START) score +# is defined. This varies from zero for species of Least Concern to 100 +# for Near Threatened, 200 for Vulnerable, 300 for Endangered and +# 400 for Critically Endangered species (using established weighting +# ratios7,8) +CATEGORY_WEIGHTS = { + 'NT': 100, + 'VU': 200, + 'EN': 300, + 'CR': 400, +} + +# Mapping from API scope values to STAR scope indices +SCOPES = [ + "whole (>90%)", + "majority (50-90%)", + "minority (<50%)" +] +DEFAULT_SCOPE = "majority (50-90%)" + +# Mapping from API severity values to STAR severity indices +SEVERITIES = [ + "very rapid declines", + "rapid declines", + "slow, significant declines", + "negligible declines", + "no decline", + "causing/could cause fluctuations" +] +DEFAULT_SEVERITY = "slow, significant declines" + +# Taken from Muir et al 2021 Supplementary Table 2, indexed by SCOPE and then SEVERITY +THREAT_WEIGHTING_TABLE = [ + [63, 24, 10, 1, 0, 10], + [52, 18, 9, 0, 0, 9], + [24, 7, 5, 0, 0, 5], +] + +class SpeciesReport: + + REPORT_COLUMNS = [ + "id_no", + "assessment_id", + "scientific_name", + "has_api_data", + "possibly_extinct", + "has_systems", + "not_terrestrial_system", + "has_threats", + "has_habitats", + "keeps_habitats", + "has_geometries", + "keeps_geometries", + "filename", + ] + + def __init__(self, id_no, assessment_id, scientific_name): + self.info = {k: False for k in self.REPORT_COLUMNS} + self.id_no = id_no + self.assessment_id = assessment_id + self.scientific_name = scientific_name + + def __getstate__(self): + return vars(self) + + def __setstate__(self, state): + vars(self).update(state) + + def __setattr__(self, name: str, value: Any) -> None: + if name in self.REPORT_COLUMNS: + self.info[name] = value + super().__setattr__(name, value) + + def __getattr__(self, name: str) -> Any: + if name in self.REPORT_COLUMNS: + return self.info[name] + return None + + def as_row(self) -> list: + return [self.info[k] for k in self.REPORT_COLUMNS] + +def process_geometries( + geometries_data: list[shapely.Geometry], + report: SpeciesReport, +) -> shapely.Geometry: + if len(geometries_data) == 0: + raise ValueError("No geometries in DB") + report.has_geometries = True + + geometry = None + for geom in geometries_data: + grange = shapely.normalize(geom) + if grange.area == 0.0: + continue + + if geometry is None: + geometry = grange + else: + geometry = shapely.union(geometry, grange) + + if geometry is None: + raise ValueError("None geometry data in DB") + report.keeps_geometries = True + + return geometry + + +def process_systems( + systems_data: list[tuple], + report: SpeciesReport, +) -> list: + if len(systems_data) == 0: + raise ValueError("No systems found") + if len(systems_data) > 1: + raise ValueError("More than one systems aggregation found") + systems = systems_data[0][0] + if systems is None: + raise ValueError("no systems info") + report.has_systems = True + + if "Terrestrial" not in systems: + raise ValueError("No Terrestrial in systems") + report.not_terrestrial_system = True + + return systems + +def process_threats( + threat_data: list[tuple[int, str, str]], + report: SpeciesReport, +) -> list[tuple[int, int]]: + cleaned_threats = [] + for code, scope, severity in threat_data: + if scope is None or scope.lower() == "unknown": + scope = DEFAULT_SCOPE + if severity is None or severity.lower() == "unknown": + severity = DEFAULT_SEVERITY + scope_index = SCOPES.index(scope.lower()) + severity_index = SEVERITIES.index(severity.lower()) + score = THREAT_WEIGHTING_TABLE[scope_index][severity_index] + if score > 0: + cleaned_threats.append((code, score)) + report.has_threats = len(cleaned_threats) > 0 + return cleaned_threats + +def process_habitats( + habitats_data: list[list[str]], + report: SpeciesReport, +) -> set: + if len(habitats_data) == 0: + # Promote to "Unknown" + habitats_data = [["18"]] + else: + report.has_habitats = True + if len(habitats_data) > 1: + raise ValueError("Expected only one habitat row") + + habitats = set() + for habitat_values_row in habitats_data: + assert len(habitat_values_row) == 1 + habitat_values = habitat_values_row[0] + + if habitat_values is None: + habitat_values = "18" + habitat_set = {x for x in habitat_values.split('|') if x} + habitats |= habitat_set + + if len(habitats) == 0: + raise ValueError("No filtered habitats") + report.keeps_habitats = True + + return habitats + + +def tidy_reproject_save( + gdf: gpd.GeoDataFrame, + report: SpeciesReport, + output_directory_path: Path, + target_projection: Optional[str], +) -> None: + """Tidy the data, reproject it, and save to GeoJSON.""" + src_crs = pyproj.CRS.from_epsg(4326) + target_crs = pyproj.CRS.from_string(target_projection) if target_projection else src_crs + + graw = gdf.loc[0].copy() + grow = aoh.tidy_data( + graw, + elevation_max=ELEVATION_MAX, + elevation_min=ELEVATION_MIN, + elevation_seperation=ELEVATION_SPREAD, + ) + os.makedirs(output_directory_path, exist_ok=True) + output_path = output_directory_path / f"{grow.id_no}.geojson" + res = gpd.GeoDataFrame(grow.to_frame().transpose(), crs=src_crs, geometry="geometry") + res_projected = res.to_crs(target_crs) + res_projected.to_file(output_path, driver="GeoJSON") + report.filename = output_path diff --git a/prepare_species/extract_species_data_psql.py b/prepare_species/extract_species_data_psql.py index 43b65c7..c282028 100644 --- a/prepare_species/extract_species_data_psql.py +++ b/prepare_species/extract_species_data_psql.py @@ -1,3 +1,4 @@ + import argparse import json import logging @@ -6,57 +7,29 @@ from functools import partial from multiprocessing import Pool from pathlib import Path -from typing import Any, Optional +from typing import Optional -import aoh import geopandas as gpd import pandas as pd -import pyproj import psycopg2 import shapely from postgis.psycopg import register +from common import ( + CATEGORY_WEIGHTS, + COLUMNS, + process_habitats, + process_threats, + process_systems, + process_geometries, + tidy_reproject_save, + SpeciesReport, +) logger = logging.getLogger(__name__) logging.basicConfig() logger.setLevel(logging.DEBUG) -# To match the FABDEM elevation map we use -# different range min/max/separation -ELEVATION_MAX = 8580 -ELEVATION_MIN = -427 -ELEVATION_SPREAD = 12 - -COLUMNS = [ - "id_no", - "assessment_id", - "assessment_year", - "season", - "systems", - "elevation_lower", - "elevation_upper", - "full_habitat_code", - "scientific_name", - "family_name", - "class_name", - "threats", - "category", - "category_weight", - "geometry" -] - -# From Muir et al: For each species, a global STAR threat abatement (START) score -# is defined. This varies from zero for species of Least Concern to 100 -# for Near Threatened, 200 for Vulnerable, 300 for Endangered and -# 400 for Critically Endangered species (using established weighting -# ratios7,8) -CATEGORY_WEIGHTS = { - 'NT': 100, - 'VU': 200, - 'EN': 300, - 'CR': 400, -} - MAIN_STATEMENT = """ SELECT assessments.sis_taxon_id as id_no, @@ -142,189 +115,6 @@ f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}" ) -class SpeciesReport: - - REPORT_COLUMNS = [ - "id_no", - "assessment_id", - "scientific_name", - "possibly_extinct", - "has_systems", - "not_terrestrial_system", - "has_threats", - "has_habitats", - "keeps_habitats", - "has_geometries", - "keeps_geometries", - "filename", - ] - - def __init__(self, id_no, assessment_id, scientific_name): - self.info = {k: False for k in self.REPORT_COLUMNS} - self.id_no = id_no - self.assessment_id = assessment_id - self.scientific_name = scientific_name - - def __getstate__(self): - return vars(self) - - def __setstate__(self, state): - vars(self).update(state) - - def __setattr__(self, name: str, value: Any) -> None: - if name in self.REPORT_COLUMNS: - self.info[name] = value - super().__setattr__(name, value) - - def __getattr__(self, name: str) -> Any: - if name in self.REPORT_COLUMNS: - return self.info[name] - return None - - def as_row(self) -> list: - return [self.info[k] for k in self.REPORT_COLUMNS] - -def tidy_reproject_save( - gdf: gpd.GeoDataFrame, - report: SpeciesReport, - output_directory_path: Path, - target_projection: Optional[str], -) -> None: - src_crs = pyproj.CRS.from_epsg(4326) - target_crs = pyproj.CRS.from_string(target_projection) if target_projection else src_crs - - graw = gdf.loc[0].copy() - grow = aoh.tidy_data( - graw, - elevation_max=ELEVATION_MAX, - elevation_min=ELEVATION_MIN, - elevation_seperation=ELEVATION_SPREAD, - ) - os.makedirs(output_directory_path, exist_ok=True) - output_path = output_directory_path / f"{grow.id_no}.geojson" - res = gpd.GeoDataFrame(grow.to_frame().transpose(), crs=src_crs, geometry="geometry") - res_projected = res.to_crs(target_crs) - res_projected.to_file(output_path, driver="GeoJSON") - report.filename = output_path - -def process_systems( - systems_data: list[tuple], - report: SpeciesReport, -) -> list: - if len(systems_data) == 0: - raise ValueError("No systems found") - if len(systems_data) > 1: - raise ValueError("More than one systems aggregation found") - systems = systems_data[0][0] - if systems is None: - raise ValueError("no systems info") - report.has_systems = True - - if "Terrestrial" not in systems: - raise ValueError("No Terrestrial in systems") - report.not_terrestrial_system = True - - return systems - -SCOPES = [ - "whole (>90%)", - "majority (50-90%)", - "minority (<50%)" -] -DEFAULT_SCOPE = "majority (50-90%)" -SEVERITIES = [ - "very rapid declines", - "rapid declines", - "slow, significant declines", - "negligible declines", - "no decline", - "causing/could cause fluctuations" -] -DEFAULT_SEVERITY = "slow, significant declines" - -# Taken from Muir et al 2021 Supplementary Table 2, indexed by SCOPE and then SEVERITY -THREAT_WEIGHTING_TABLE = [ - [63, 24, 10, 1, 0, 10], - [52, 18, 9, 0, 0, 9], - [24, 7, 5, 0, 0, 5], -] - -def process_threats( - threat_data: list[tuple[int, str, str]], - report: SpeciesReport, -) -> list[tuple[int, int]]: - cleaned_threats = [] - for code, scope, severity in threat_data: - if scope is None or scope.lower() == "unknown": - scope = DEFAULT_SCOPE - if severity is None or severity.lower() == "unknown": - severity = DEFAULT_SEVERITY - scope_index = SCOPES.index(scope.lower()) - severity_index = SEVERITIES.index(severity.lower()) - score = THREAT_WEIGHTING_TABLE[scope_index][severity_index] - if score > 0: - cleaned_threats.append((code, score)) - report.has_threats = len(cleaned_threats) > 0 - return cleaned_threats - -def process_habitats( - habitats_data: list[list[str]], - report: SpeciesReport, -) -> set: - if len(habitats_data) == 0: - # Promote to "Unknown" - habitats_data = [["18"]] - else: - report.has_habitats = True - if len(habitats_data) > 1: - raise ValueError("Expected only one habitat row") - - habitats = set() - for habitat_values_row in habitats_data: - assert len(habitat_values_row) == 1 - habitat_values = habitat_values_row[0] - - if habitat_values is None: - habitat_values = "18" - habitat_set = {x for x in habitat_values.split('|') if x} - habitats |= habitat_set - - if len(habitats) == 0: - raise ValueError("No filtered habitats") - report.keeps_habitats = True - - return habitats - -def process_geometries( - geometries_data: list[tuple[int, shapely.Geometry]], - report: SpeciesReport, -) -> shapely.Geometry: - if len(geometries_data) == 0: - raise ValueError("No geometries in DB") - report.has_geometries = True - - geometry = None - for geometry_row in geometries_data: - assert len(geometry_row) == 1 - row_geometry = geometry_row[0] - if row_geometry is None: - continue - - grange = shapely.normalize(shapely.from_wkb(row_geometry.to_ewkb())) - if grange.area == 0.0: - continue - - if geometry is None: - geometry = grange - else: - geometry = shapely.union(geometry, grange) - - if geometry is None: - raise ValueError("None geometry data in DB") - report.keeps_geometries = True - - return geometry - def process_row( class_name: str, output_directory_path: Path, @@ -341,6 +131,7 @@ def process_row( elevation_lower, elevation_upper, scientific_name, family_name, category = row report = SpeciesReport(id_no, assessment_id, scientific_name) + report.has_api_data = True # From Chess STAR report if possibly_extinct or possibly_extinct_in_the_wild: @@ -370,8 +161,12 @@ def process_row( cursor.execute(GEOMETRY_STATEMENT, (assessment_id, presence)) geometries_data = cursor.fetchall() + cleaned_geometries = [ + shapely.from_wkb(row_geometry[0].to_ewkb()) + for row_geometry in geometries_data if row_geometry[0] is not None + ] try: - geometry = process_geometries(geometries_data, report) + geometry = process_geometries(cleaned_geometries, report) except ValueError as _exc: return report diff --git a/prepare_species/extract_species_data_redlist.py b/prepare_species/extract_species_data_redlist.py new file mode 100644 index 0000000..41cfd89 --- /dev/null +++ b/prepare_species/extract_species_data_redlist.py @@ -0,0 +1,511 @@ +#!/usr/bin/env python3 +""" +Extract species data from IUCN Red List spatial data + API. + +This script combines two data sources: +1. IUCN Red List spatial data (shapefiles) - for range geometries with presence/origin filtering +2. IUCN Red List API v4 (via redlistapi package) - for assessment data (threats, habitats, elevation) + +REQUIRED SETUP: +============== + +1. Download IUCN Red List Spatial Data: + - Go to: https://www.iucnredlist.org/resources/spatial-data-download + - Create an account (free for non-commercial use) + - Download the shapefile for your target taxonomic group(s): + * MAMMALS + * AMPHIBIANS + * REPTILES + * BIRDS + - Extract the shapefiles to a known location on your computer + - Note: Each download contains a shapefile (.shp) and associated files (.dbf, .shx, .prj) + +2. Get an IUCN Red List API Token: + - Go to: https://api.iucnredlist.org/users/sign_up + - Sign up for a free API account + - You'll receive an API token via email + - Set the token as an environment variable: + export IUCN_REDLIST_TOKEN="your_token_here" + +USAGE: +====== +python3 extract_species_data_redlist.py \ + --shapefile /path/to/MAMMALS_DIRECTORY \ + --class MAMMALIA \ + --output /path/to/output/species-info/MAMMALIA/ \ + --projection "ESRI:54009" \ + --excludes /path/to/SpeciesList_generalisedRangePolygons.csv + +Or for a single shapefile: +python3 extract_species_data_redlist.py \ + --shapefile /path/to/MAMMALS.shp \ + --class MAMMALIA \ + --output /path/to/output/species-info/MAMMALIA/ + +The script will: +- Read shapefile(s) and filter by presence (1, 2) and origin (1, 2, 6) +- If a directory is provided, load and merge all .shp files in that directory +- Query the IUCN API for each species to get threats, habitats, elevation +- Merge the data and save per-species GeoJSON files +- Generate a report CSV showing what data was successfully retrieved +""" + +import argparse +import json +import logging +import os +import sys +import time +from pathlib import Path +from typing import Optional + +import geopandas as gpd +import pandas as pd +import redlistapi +import requests.exceptions +import shapely + +from common import ( + CATEGORY_WEIGHTS, + COLUMNS, + process_habitats, + process_geometries, + process_threats, + process_systems, + tidy_reproject_save, + SpeciesReport, +) + +logger = logging.getLogger(__name__) +logging.basicConfig() +logger.setLevel(logging.DEBUG) + + +def process_systems_from_api(assessment: redlistapi.Assessment, report: SpeciesReport) -> list: + """Extract and validate systems data from API response.""" + systems_data = assessment.systems_as_pandas().to_dict(orient='records') + systems = "|".join([x['description'] for x in systems_data]) + return process_systems([(systems,)], report) + +def process_threats_from_api(assessment: redlistapi.Assessment, report: SpeciesReport) -> list: + """Extract and process threat data from API, applying STAR weighting.""" + threats_data = assessment.threats_as_pandas().to_dict(orient='records') + + # API uses underscores (e.g., "2_3_2") but we need dots (e.g., "2.3.2") for consistency with DB format + threats = [( + threat.get('code', '').replace('_', '.'), + threat.get('scope'), + threat.get('severity'), + ) for threat in threats_data if threat.get('timing') != 'Past, Unlikely to Return'] + + return process_threats(threats, report) + +def process_habitats_from_api(assessment: redlistapi.Assessment, report: SpeciesReport) -> set: + habitats_data = assessment.habitats_as_pandas().to_dict(orient='records') + # API uses underscores (e.g., "1_5") but we need dots (e.g., "1.5") for consistency with DB format + # Convert codes and join with pipe separator to match DB format + codes_list = [habitat.get('code', '').replace('_', '.') for habitat in habitats_data] + codes = [['|'.join(codes_list)]] + return process_habitats(codes, report) + +def process_geometries_from_api(geometries: pd.DataFrame, report: SpeciesReport) -> shapely.Geometry: + # Reworking data to look like database data for common code + reformated_geometries = [] + for _, row in geometries.iterrows(): + geom = row.geometry + reformated_geometries.append(geom) + return process_geometries(reformated_geometries, report) + +def get_elevation_from_api(assessment: dict, report: SpeciesReport) -> tuple: + """Extract elevation limits from API response.""" + supplementary_info = assessment.get('supplementary_info', {}) + + elevation_lower = supplementary_info.get('lower_elevation_limit') + elevation_upper = supplementary_info.get('upper_elevation_limit') + + if elevation_lower is not None or elevation_upper is not None: + report.has_elevation = True + + if elevation_lower is not None: + elevation_lower = int(elevation_lower) + if elevation_upper is not None: + elevation_upper = int(elevation_upper) + + return elevation_lower, elevation_upper + +def process_species( + token: str, + class_name: str, + output_directory_path: Path, + target_projection: Optional[str], + base_presence_filter: tuple[int, ...], + species_data: tuple, +) -> SpeciesReport: + """ + Process a single species, combining all its range geometries. + + This function: + 1. Queries the IUCN API for assessment data + 2. Determines correct presence filter based on possibly_extinct status + 3. Filters and unions all geometry polygons for the species + 4. Merges both data sources + 5. Saves the result as a GeoJSON file + """ + + id_no, species_gdf = species_data + scientific_name = species_gdf.iloc[0]['scientific_name'] + + report = SpeciesReport(id_no, None, scientific_name) + + logger.info("Processing %s (%s)", id_no, scientific_name) + + # Get assessment from API. The Redlistapi package returns the most recent assessment when you + # use `from_taxid` + try: + factory = redlistapi.AssessmentFactory(token) + assessment = factory.from_taxid(id_no, scope='1') + report.has_api_data = True + except (ValueError) as exc: + logger.error("Failed to get API data for %s: %s", id_no, exc) + return report + except (requests.exceptions.RequestException) as exc: + logger.error("Network error for %s: %s", id_no, exc) + return report + + # Whilst you can do `assessment.assessment` to get the original data as a dict, + # there is a bunch of data cleaning that is done as part of `assessment_as_pandas` which + # is nice to have, so we call that and then covert it back to a dict for Python + # convenience. + assessment_dict = assessment.assessment_as_pandas().to_dict(orient='records')[0] + + try: + assessment_id = assessment_dict['assid'] + assessment_year = assessment_dict['assessment_date'].year + category = assessment_dict['red_list_category'] + family_name = assessment_dict['family_name'] + possibly_extinct = assessment_dict['possibly_extinct'] + possibly_extinct_in_the_wild = assessment_dict['possibly_extinct_in_the_wild'] + infrarank = assessment_dict['infrarank'] + except KeyError as exc: + logger.error("Failed to get data from assessment record for %s: %s", id_no, exc) + return report + + # Remove subspecies and non-global assessments. This happens in the original SQL in the Postgres + # version. Note that the "True" is because the redlistapi package casts this to a string with + # the values of "True", "False", and "None". + if infrarank == "True": + logger.debug("Dropping %s: infrarank not none: %s", id_no, infrarank) + return report + + report.assessment_id = assessment_id + + # From Chess STAR report: adjust presence filter for possibly extinct species + presence_filter = base_presence_filter + if possibly_extinct or possibly_extinct_in_the_wild: + presence_filter = presence_filter + (4,) + report.possibly_extinct = True + + # Only process species in threat categories + if category not in CATEGORY_WEIGHTS: + logger.debug("Dropping %s: category %s not in %s", id_no, category, list(CATEGORY_WEIGHTS.keys())) + return report + + # Process systems + try: + systems = process_systems_from_api(assessment, report) + except ValueError as exc: + logger.debug("Dropping %s: %s", id_no, exc) + return report + + # Process threats + threats = process_threats_from_api(assessment, report) + if len(threats) == 0: + logger.debug("Dropping %s: no threats", id_no) + return report + + # Process habitats + try: + habitats = process_habitats_from_api(assessment, report) + except ValueError as exc: + logger.debug("Dropping %s: %s", id_no, exc) + return report + + # Get elevation + elevation_lower, elevation_upper = get_elevation_from_api(assessment_dict, report) + + # Now filter and union geometries based on the correct presence filter + # Filter by presence codes (now that we know if species is possibly extinct) + filtered_gdf = species_gdf[species_gdf['presence'].isin(presence_filter)] + + try: + geometry = process_geometries_from_api(filtered_gdf, report) + except ValueError as exc: + logger.debug("Dropping %s: %s", id_no, exc) + return report + + # Create GeoDataFrame with all data + gdf = gpd.GeoDataFrame( + [[ + id_no, + assessment_id, + int(assessment_year) if assessment_year else None, + "all", # season + systems, + elevation_lower, + elevation_upper, + '|'.join(list(habitats)), + scientific_name, + family_name, + class_name, + json.dumps(threats), + category, + CATEGORY_WEIGHTS[category], + geometry + ]], + columns=COLUMNS, + crs='EPSG:4326' + ) + + # Save to file + tidy_reproject_save(gdf, report, output_directory_path, target_projection) + + return report + + +def extract_data_from_shapefile( + shapefile_path: Path, + class_name: str, + token: str, + excludes_path: Optional[Path], + output_directory_path: Path, + target_projection: Optional[str], + presence_filter: tuple[int, ...], + origin_filter: tuple[int, ...], + rate_limit: float, +) -> None: + """ + Extract species data from IUCN shapefile(s) combined with API data. + + Args: + shapefile_path: Path to IUCN Red List shapefile or directory containing shapefiles + class_name: Taxonomic class (e.g., "MAMMALIA") + token: IUCN API token + excludes_path: Optional CSV of species IDs to exclude + output_directory_path: Where to save output files + target_projection: Target CRS for output + presence_filter: Presence codes to include (default: 1, 2) + origin_filter: Origin codes to include (default: 1, 2, 6) + """ + + if shapefile_path.is_dir(): + shapefiles = list(shapefile_path.glob("*.shp")) + if len(shapefiles) == 0: + raise ValueError(f"No shapefiles found in directory: {shapefile_path}") + + logger.info("Found %d shapefile(s) in %s", len(shapefiles), shapefile_path) + + gdfs = [] + for shp_file in shapefiles: + gdf_part = gpd.read_file(shp_file) + gdfs.append(gdf_part) + + gdf = pd.concat(gdfs, ignore_index=True) + logger.info("Combined %d total rows from %d shapefile(s)", len(gdf), len(shapefiles)) + else: + gdf = gpd.read_file(shapefile_path) + logger.info("Loaded %d rows from shapefile", len(gdf)) + + + # Normalise column names (shapefiles may have different conventions) + column_map = {} + for col in gdf.columns: + col_lower = col.lower() + if col_lower in ['binomial', 'sci_name', 'scientific_name']: + column_map[col] = 'scientific_name' + elif col_lower in ['id_no', 'sisid', 'sis_id']: + column_map[col] = 'id_no' + elif col_lower in ['presence', 'pres']: + column_map[col] = 'presence' + elif col_lower in ['origin', 'orig']: + column_map[col] = 'origin' + + gdf = gdf.rename(columns=column_map) + + # Filter by origin (but NOT presence yet - we need to check possibly_extinct first) + if 'origin' in gdf.columns: + before = len(gdf) + gdf = gdf[gdf['origin'].isin(origin_filter)] + logger.info("Filtered by origin %s: %d -> %d rows", origin_filter, before, len(gdf)) + else: + raise ValueError("No 'origin' column found in shapefile - cannot filter by origin") + + if 'presence' not in gdf.columns: + raise ValueError("Shapefile must have a 'presence' column") + + excludes = set() + if excludes_path: + try: + df = pd.read_csv(excludes_path) + excludes = set(df.id_no.unique()) + logger.info("Excluding %d species from %s", len(excludes), excludes_path) + except (FileNotFoundError, pd.errors.ParserError, KeyError, ValueError) as exc: + raise ValueError(f"Could not load excludes file: {excludes_path}") from exc + + if excludes: + before = len(gdf) + gdf = gdf[~gdf['id_no'].isin(excludes)] + logger.info("After excluding species: %d -> %d shapes", before, len(gdf)) + + # Group by species + # The shapefile has multiple rows per species (one per range polygon) + # We'll pass all rows for each species to the processing function so it can + # apply the correct presence filter based on possibly_extinct status + species_groups = [] + for id_no, group in gdf.groupby('id_no'): + species_groups.append((id_no, group)) + + logger.info("Grouped into %d unique species", len(species_groups)) + + era = "current" + era_output_directory_path = output_directory_path / era + + logger.info("Processing %d species for %s in %s scenario", len(species_groups), class_name, era) + + reports = [] + last = 0.0 + for species in species_groups: + + # The IUCN Redlist API v4 us currently rate limited to 120 requests per minute + now = time.time() + time_since_last_call = now - last + if time_since_last_call < rate_limit: + time.sleep(rate_limit - time_since_last_call) + last = now + + result = process_species( + token, + class_name, + era_output_directory_path, + target_projection, + presence_filter, + species + ) + reports.append(result) + + reports_df = pd.DataFrame( + [x.as_row() for x in reports], + columns=SpeciesReport.REPORT_COLUMNS + ).sort_values('id_no') + + os.makedirs(era_output_directory_path, exist_ok=True) + reports_df.to_csv(era_output_directory_path / "report.csv", index=False) + + logger.info("Saved report to %s", era_output_directory_path / 'report.csv') + + total = len(reports) + with_files = reports_df['filename'].notna().sum() + logger.info("Successfully processed %d/%d species", with_files, total) + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Extract species data from IUCN Red List shapefile + API.", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=__doc__ + ) + parser.add_argument( + '--shapefile', + type=Path, + help="Path to IUCN Red List shapefile (e.g., MAMMALS.shp) or directory containing shapefiles", + required=True, + ) + parser.add_argument( + '--class', + type=str, + help="Species class name (e.g., MAMMALIA, AMPHIBIA, REPTILIA, AVES)", + required=True, + dest="classname", + ) + parser.add_argument( + '--excludes', + type=Path, + help="CSV of taxon IDs to exclude (SpeciesList_generalisedRangePolygons.csv)", + required=False, + dest="excludes" + ) + parser.add_argument( + '--output', + type=Path, + help='Directory where per species GeoJSON files will be stored', + required=True, + dest='output_directory_path', + ) + parser.add_argument( + '--projection', + type=str, + help="Target projection (default: ESRI:54009)", + required=False, + dest="target_projection", + default="ESRI:54009" + ) + parser.add_argument( + '--presence', + type=str, + help="Comma-separated presence codes to include (default: 1,2)", + default="1,2", + ) + parser.add_argument( + '--origin', + type=str, + help="Comma-separated origin codes to include (default: 1,2,6)", + default="1,2,6", + ) + parser.add_argument( + '--rate_limit', + type=float, + help="Time between Redlist API requests (default: 0.5)", + default=0.5, + ) + + args = parser.parse_args() + + # Get API token + token = os.getenv('REDLIST_API_TOKEN') + if not token: + print("ERROR: REDLIST_API_TOKEN environment variable not set") + print("Get a token from: https://api.iucnredlist.org/users/sign_up") + print("Then set it with: export REDLIST_API_TOKEN='your_token_here'") + sys.exit(1) + + # Parse presence and origin filters + presence_filter = tuple(int(x) for x in args.presence.split(',')) + origin_filter = tuple(int(x) for x in args.origin.split(',')) + + # Verify shapefile or directory exists + if not args.shapefile.exists(): + print(f"ERROR: Path not found: {args.shapefile}") + print("\nDownload shapefiles from: https://www.iucnredlist.org/resources/spatial-data-download") + sys.exit(1) + + if args.shapefile.is_dir(): + # Check that directory contains at least one .shp file + shapefiles = list(args.shapefile.glob("*.shp")) + if len(shapefiles) == 0: + print(f"ERROR: No .shp files found in directory: {args.shapefile}") + sys.exit(1) + + extract_data_from_shapefile( + args.shapefile, + args.classname, + token, + args.excludes, + args.output_directory_path, + args.target_projection, + presence_filter, + origin_filter, + args.rate_limit, + ) + + +if __name__ == "__main__": + main() diff --git a/requirements.txt b/requirements.txt index 720c71a..8077573 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,6 +6,7 @@ psutil pymer4 pyproj scikit-image +redlistapi yirgacheffe>=1.9 aoh[validation]>=1.0 @@ -15,3 +16,4 @@ gdal[numpy] pylint mypy pytest +types-requests diff --git a/tests/test_species_filter.py b/tests/test_species_filter.py index b00a538..5d56910 100644 --- a/tests/test_species_filter.py +++ b/tests/test_species_filter.py @@ -2,7 +2,7 @@ import postgis import shapely -from prepare_species.extract_species_data_psql import process_habitats, process_geometries, \ +from prepare_species.common import process_habitats, process_geometries, \ process_threats, SpeciesReport def test_empty_report() -> None: @@ -65,33 +65,47 @@ def test_empty_geometry_list(): assert not report.keeps_geometries def test_simple_resident_species_geometry_filter_point(): - geoemetries_data = [ + geometries_data = [ (postgis.Geometry.from_ewkb("000000000140000000000000004010000000000000"),), ] + cleaned_geometries = [ + shapely.from_wkb(row_geometry[0].to_ewkb()) + for row_geometry in geometries_data if row_geometry[0] is not None + ] + print(cleaned_geometries) report = SpeciesReport(1, 2, "name") with pytest.raises(ValueError): - _ = process_geometries(geoemetries_data, report) + r = process_geometries(cleaned_geometries, report) + print(r) assert report.has_geometries assert not report.keeps_geometries def test_simple_resident_species_geometry_filter_polygon(): - geoemetries_data = [ + geometries_data = [ (postgis.Geometry.from_ewkb("0103000000010000000400000000000000000000000000000000000000000000000000F03F000000000000F03F9A9999999999B93F000000000000F03F00000000000000000000000000000000"),), # pylint: disable=C0301 ] + cleaned_geometries = [ + shapely.from_wkb(row_geometry[0].to_ewkb()) + for row_geometry in geometries_data if row_geometry[0] is not None + ] report = SpeciesReport(1, 2, "name") - res = process_geometries(geoemetries_data, report) + res = process_geometries(cleaned_geometries, report) assert res == shapely.Polygon([(0.0, 0.0), (0.1, 1.0), (1.0, 1.0), (0.0, 0.0)]) assert report.has_geometries assert report.keeps_geometries def test_simple_migratory_species_geometry_filter(): - geoemetries_data = [ + geometries_data = [ (postgis.Geometry.from_ewkb("0103000000010000000400000000000000000000000000000000000000000000000000F03F000000000000F03F9A9999999999B93F000000000000F03F00000000000000000000000000000000"),), # pylint: disable=C0301 (postgis.Geometry.from_ewkb("0103000000010000000400000000000000000000000000000000000000000000000000F03F000000000000F03F9A9999999999B93F000000000000F03F00000000000000000000000000000000"),), # pylint: disable=C0301 ] + cleaned_geometries = [ + shapely.from_wkb(row_geometry[0].to_ewkb()) + for row_geometry in geometries_data if row_geometry[0] is not None + ] report = SpeciesReport(1, 2, "name") - res = process_geometries(geoemetries_data, report) + res = process_geometries(cleaned_geometries, report) assert res == shapely.Polygon([(0.1, 1.0), (1.0, 1.0), (0.0, 0.0), (0.1, 1.0)]) assert report.has_geometries assert report.keeps_geometries