From 625958c6bd576ff92b3ea0d91b0ed29a81c641a4 Mon Sep 17 00:00:00 2001 From: Michael Dales Date: Thu, 16 Oct 2025 14:01:54 +0100 Subject: [PATCH 01/12] Add script to get Redlist data via API --- prepare_species/common.py | 201 +++++++ prepare_species/extract_species_data_psql.py | 203 +------ .../extract_species_data_redlist.py | 539 ++++++++++++++++++ 3 files changed, 751 insertions(+), 192 deletions(-) create mode 100644 prepare_species/common.py create mode 100644 prepare_species/extract_species_data_redlist.py diff --git a/prepare_species/common.py b/prepare_species/common.py new file mode 100644 index 0000000..97586d7 --- /dev/null +++ b/prepare_species/common.py @@ -0,0 +1,201 @@ +import os +from pathlib import Path +from typing import Any, Optional + +import aoh +import geopandas as gpd +import pyproj + +# To match the FABDEM elevation map we use +# different range min/max/separation +ELEVATION_MAX = 8580 +ELEVATION_MIN = -427 +ELEVATION_SPREAD = 12 + +COLUMNS = [ + "id_no", + "assessment_id", + "assessment_year", + "season", + "systems", + "elevation_lower", + "elevation_upper", + "full_habitat_code", + "scientific_name", + "family_name", + "class_name", + "threats", + "category", + "category_weight", + "geometry" +] + +# From Muir et al: For each species, a global STAR threat abatement (START) score +# is defined. This varies from zero for species of Least Concern to 100 +# for Near Threatened, 200 for Vulnerable, 300 for Endangered and +# 400 for Critically Endangered species (using established weighting +# ratios7,8) +CATEGORY_WEIGHTS = { + 'NT': 100, + 'VU': 200, + 'EN': 300, + 'CR': 400, +} + +# Mapping from API scope values to STAR scope indices +SCOPES = [ + "whole (>90%)", + "majority (50-90%)", + "minority (<50%)" +] +DEFAULT_SCOPE = "majority (50-90%)" + +# Mapping from API severity values to STAR severity indices +SEVERITIES = [ + "very rapid declines", + "rapid declines", + "slow, significant declines", + "negligible declines", + "no decline", + "causing/could cause fluctuations" +] +DEFAULT_SEVERITY = "slow, significant declines" + +# Taken from Muir et al 2021 Supplementary Table 2, indexed by SCOPE and then SEVERITY +THREAT_WEIGHTING_TABLE = [ + [63, 24, 10, 1, 0, 10], + [52, 18, 9, 0, 0, 9], + [24, 7, 5, 0, 0, 5], +] + +class SpeciesReport: + + REPORT_COLUMNS = [ + "id_no", + "assessment_id", + "scientific_name", + "possibly_extinct", + "has_systems", + "not_terrestrial_system", + "has_threats", + "has_habitats", + "keeps_habitats", + "has_geometries", + "keeps_geometries", + "filename", + ] + + def __init__(self, id_no, assessment_id, scientific_name): + self.info = {k: False for k in self.REPORT_COLUMNS} + self.id_no = id_no + self.assessment_id = assessment_id + self.scientific_name = scientific_name + + def __getstate__(self): + return vars(self) + + def __setstate__(self, state): + vars(self).update(state) + + def __setattr__(self, name: str, value: Any) -> None: + if name in self.REPORT_COLUMNS: + self.info[name] = value + super().__setattr__(name, value) + + def __getattr__(self, name: str) -> Any: + if name in self.REPORT_COLUMNS: + return self.info[name] + return None + + def as_row(self) -> list: + return [self.info[k] for k in self.REPORT_COLUMNS] + +def process_systems( + systems_data: list[tuple], + report: SpeciesReport, +) -> list: + if len(systems_data) == 0: + raise ValueError("No systems found") + if len(systems_data) > 1: + raise ValueError("More than one systems aggregation found") + systems = systems_data[0][0] + if systems is None: + raise ValueError("no systems info") + report.has_systems = True + + if "Terrestrial" not in systems: + raise ValueError("No Terrestrial in systems") + report.not_terrestrial_system = True + + return systems + +def process_threats( + threat_data: list[tuple[int, str, str]], + report: SpeciesReport, +) -> list[tuple[int, int]]: + cleaned_threats = [] + for code, scope, severity in threat_data: + if scope is None or scope.lower() == "unknown": + scope = DEFAULT_SCOPE + if severity is None or severity.lower() == "unknown": + severity = DEFAULT_SEVERITY + scope_index = SCOPES.index(scope.lower()) + severity_index = SEVERITIES.index(severity.lower()) + score = THREAT_WEIGHTING_TABLE[scope_index][severity_index] + if score > 0: + cleaned_threats.append((code, score)) + report.has_threats = len(cleaned_threats) > 0 + return cleaned_threats + +def process_habitats( + habitats_data: list[list[str]], + report: SpeciesReport, +) -> set: + if len(habitats_data) == 0: + # Promote to "Unknown" + habitats_data = [["18"]] + else: + report.has_habitats = True + if len(habitats_data) > 1: + raise ValueError("Expected only one habitat row") + + habitats = set() + for habitat_values_row in habitats_data: + assert len(habitat_values_row) == 1 + habitat_values = habitat_values_row[0] + + if habitat_values is None: + habitat_values = "18" + habitat_set = {x for x in habitat_values.split('|') if x} + habitats |= habitat_set + + if len(habitats) == 0: + raise ValueError("No filtered habitats") + report.keeps_habitats = True + + return habitats + + +def tidy_reproject_save( + gdf: gpd.GeoDataFrame, + report: SpeciesReport, + output_directory_path: Path, + target_projection: Optional[str], +) -> None: + """Tidy the data, reproject it, and save to GeoJSON.""" + src_crs = pyproj.CRS.from_epsg(4326) + target_crs = pyproj.CRS.from_string(target_projection) if target_projection else src_crs + + graw = gdf.loc[0].copy() + grow = aoh.tidy_data( + graw, + elevation_max=ELEVATION_MAX, + elevation_min=ELEVATION_MIN, + elevation_seperation=ELEVATION_SPREAD, + ) + os.makedirs(output_directory_path, exist_ok=True) + output_path = output_directory_path / f"{grow.id_no}.geojson" + res = gpd.GeoDataFrame(grow.to_frame().transpose(), crs=src_crs, geometry="geometry") + res_projected = res.to_crs(target_crs) + res_projected.to_file(output_path, driver="GeoJSON") + report.filename = output_path diff --git a/prepare_species/extract_species_data_psql.py b/prepare_species/extract_species_data_psql.py index 43b65c7..a437df2 100644 --- a/prepare_species/extract_species_data_psql.py +++ b/prepare_species/extract_species_data_psql.py @@ -1,3 +1,4 @@ + import argparse import json import logging @@ -6,57 +7,28 @@ from functools import partial from multiprocessing import Pool from pathlib import Path -from typing import Any, Optional +from typing import Optional -import aoh import geopandas as gpd import pandas as pd -import pyproj import psycopg2 import shapely from postgis.psycopg import register +from common import ( + CATEGORY_WEIGHTS, + COLUMNS, + process_habitats, + process_threats, + process_systems, + tidy_reproject_save, + SpeciesReport, +) logger = logging.getLogger(__name__) logging.basicConfig() logger.setLevel(logging.DEBUG) -# To match the FABDEM elevation map we use -# different range min/max/separation -ELEVATION_MAX = 8580 -ELEVATION_MIN = -427 -ELEVATION_SPREAD = 12 - -COLUMNS = [ - "id_no", - "assessment_id", - "assessment_year", - "season", - "systems", - "elevation_lower", - "elevation_upper", - "full_habitat_code", - "scientific_name", - "family_name", - "class_name", - "threats", - "category", - "category_weight", - "geometry" -] - -# From Muir et al: For each species, a global STAR threat abatement (START) score -# is defined. This varies from zero for species of Least Concern to 100 -# for Near Threatened, 200 for Vulnerable, 300 for Endangered and -# 400 for Critically Endangered species (using established weighting -# ratios7,8) -CATEGORY_WEIGHTS = { - 'NT': 100, - 'VU': 200, - 'EN': 300, - 'CR': 400, -} - MAIN_STATEMENT = """ SELECT assessments.sis_taxon_id as id_no, @@ -142,159 +114,6 @@ f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}" ) -class SpeciesReport: - - REPORT_COLUMNS = [ - "id_no", - "assessment_id", - "scientific_name", - "possibly_extinct", - "has_systems", - "not_terrestrial_system", - "has_threats", - "has_habitats", - "keeps_habitats", - "has_geometries", - "keeps_geometries", - "filename", - ] - - def __init__(self, id_no, assessment_id, scientific_name): - self.info = {k: False for k in self.REPORT_COLUMNS} - self.id_no = id_no - self.assessment_id = assessment_id - self.scientific_name = scientific_name - - def __getstate__(self): - return vars(self) - - def __setstate__(self, state): - vars(self).update(state) - - def __setattr__(self, name: str, value: Any) -> None: - if name in self.REPORT_COLUMNS: - self.info[name] = value - super().__setattr__(name, value) - - def __getattr__(self, name: str) -> Any: - if name in self.REPORT_COLUMNS: - return self.info[name] - return None - - def as_row(self) -> list: - return [self.info[k] for k in self.REPORT_COLUMNS] - -def tidy_reproject_save( - gdf: gpd.GeoDataFrame, - report: SpeciesReport, - output_directory_path: Path, - target_projection: Optional[str], -) -> None: - src_crs = pyproj.CRS.from_epsg(4326) - target_crs = pyproj.CRS.from_string(target_projection) if target_projection else src_crs - - graw = gdf.loc[0].copy() - grow = aoh.tidy_data( - graw, - elevation_max=ELEVATION_MAX, - elevation_min=ELEVATION_MIN, - elevation_seperation=ELEVATION_SPREAD, - ) - os.makedirs(output_directory_path, exist_ok=True) - output_path = output_directory_path / f"{grow.id_no}.geojson" - res = gpd.GeoDataFrame(grow.to_frame().transpose(), crs=src_crs, geometry="geometry") - res_projected = res.to_crs(target_crs) - res_projected.to_file(output_path, driver="GeoJSON") - report.filename = output_path - -def process_systems( - systems_data: list[tuple], - report: SpeciesReport, -) -> list: - if len(systems_data) == 0: - raise ValueError("No systems found") - if len(systems_data) > 1: - raise ValueError("More than one systems aggregation found") - systems = systems_data[0][0] - if systems is None: - raise ValueError("no systems info") - report.has_systems = True - - if "Terrestrial" not in systems: - raise ValueError("No Terrestrial in systems") - report.not_terrestrial_system = True - - return systems - -SCOPES = [ - "whole (>90%)", - "majority (50-90%)", - "minority (<50%)" -] -DEFAULT_SCOPE = "majority (50-90%)" -SEVERITIES = [ - "very rapid declines", - "rapid declines", - "slow, significant declines", - "negligible declines", - "no decline", - "causing/could cause fluctuations" -] -DEFAULT_SEVERITY = "slow, significant declines" - -# Taken from Muir et al 2021 Supplementary Table 2, indexed by SCOPE and then SEVERITY -THREAT_WEIGHTING_TABLE = [ - [63, 24, 10, 1, 0, 10], - [52, 18, 9, 0, 0, 9], - [24, 7, 5, 0, 0, 5], -] - -def process_threats( - threat_data: list[tuple[int, str, str]], - report: SpeciesReport, -) -> list[tuple[int, int]]: - cleaned_threats = [] - for code, scope, severity in threat_data: - if scope is None or scope.lower() == "unknown": - scope = DEFAULT_SCOPE - if severity is None or severity.lower() == "unknown": - severity = DEFAULT_SEVERITY - scope_index = SCOPES.index(scope.lower()) - severity_index = SEVERITIES.index(severity.lower()) - score = THREAT_WEIGHTING_TABLE[scope_index][severity_index] - if score > 0: - cleaned_threats.append((code, score)) - report.has_threats = len(cleaned_threats) > 0 - return cleaned_threats - -def process_habitats( - habitats_data: list[list[str]], - report: SpeciesReport, -) -> set: - if len(habitats_data) == 0: - # Promote to "Unknown" - habitats_data = [["18"]] - else: - report.has_habitats = True - if len(habitats_data) > 1: - raise ValueError("Expected only one habitat row") - - habitats = set() - for habitat_values_row in habitats_data: - assert len(habitat_values_row) == 1 - habitat_values = habitat_values_row[0] - - if habitat_values is None: - habitat_values = "18" - habitat_set = {x for x in habitat_values.split('|') if x} - habitats |= habitat_set - - if len(habitats) == 0: - raise ValueError("No filtered habitats") - report.keeps_habitats = True - - return habitats - def process_geometries( geometries_data: list[tuple[int, shapely.Geometry]], report: SpeciesReport, diff --git a/prepare_species/extract_species_data_redlist.py b/prepare_species/extract_species_data_redlist.py new file mode 100644 index 0000000..6927760 --- /dev/null +++ b/prepare_species/extract_species_data_redlist.py @@ -0,0 +1,539 @@ +#!/usr/bin/env python3 +""" +Extract species data from IUCN Red List spatial data + API. + +This script combines two data sources: +1. IUCN Red List spatial data (shapefiles) - for range geometries with presence/origin filtering +2. IUCN Red List API v4 (via redlistapi package) - for assessment data (threats, habitats, elevation) + +REQUIRED SETUP: +============== + +1. Download IUCN Red List Spatial Data: + - Go to: https://www.iucnredlist.org/resources/spatial-data-download + - Create an account (free for non-commercial use) + - Download the shapefile for your target taxonomic group(s): + * MAMMALS + * AMPHIBIANS + * REPTILES + * BIRDS (if available) + - Extract the shapefiles to a known location on your computer + - Note: Each download contains a shapefile (.shp) and associated files (.dbf, .shx, .prj) + +2. Get an IUCN Red List API Token: + - Go to: https://api.iucnredlist.org/users/sign_up + - Sign up for a free API account + - You'll receive an API token via email + - Set the token as an environment variable: + export IUCN_REDLIST_TOKEN="your_token_here" + +USAGE: +====== +python3 extract_species_data_redlist.py \ + --shapefile /path/to/MAMMALS_DIRECTORY \ + --class MAMMALIA \ + --output /path/to/output/species-info/MAMMALIA/ \ + --projection "ESRI:54009" \ + --excludes /path/to/SpeciesList_generalisedRangePolygons.csv + +Or for a single shapefile: +python3 extract_species_data_redlist.py \ + --shapefile /path/to/MAMMALS.shp \ + --class MAMMALIA \ + --output /path/to/output/species-info/MAMMALIA/ + +The script will: +- Read shapefile(s) and filter by presence (1, 2) and origin (1, 2, 6) +- If a directory is provided, load and merge all .shp files in that directory +- Query the IUCN API for each species to get threats, habitats, elevation +- Merge the data and save per-species GeoJSON files +- Generate a report CSV showing what data was successfully retrieved +""" + +import argparse +import json +import logging +import os +import sys +from functools import partial +from multiprocessing import Pool +from pathlib import Path +from typing import Optional + +import geopandas as gpd +import pandas as pd +import redlistapi +import requests.exceptions +import shapely + +from common import ( + CATEGORY_WEIGHTS, + COLUMNS, + process_habitats, + process_threats, + process_systems, + tidy_reproject_save, + SpeciesReport, +) + +logger = logging.getLogger(__name__) +logging.basicConfig() +logger.setLevel(logging.DEBUG) + + +def process_systems_from_api(assessment_df: pd.DataFrame, report: SpeciesReport) -> str: + """Extract and validate systems data from API response.""" + # The assessment_as_pandas() returns terrestrial, freshwater, marine as boolean columns + systems_list = [] + if assessment_df['terrestrial'].iloc[0]: + systems_list.append("Terrestrial") + if assessment_df['freshwater'].iloc[0]: + systems_list.append("Freshwater") + if assessment_df['marine'].iloc[0]: + systems_list.append("Marine") + + systems = "|".join(systems_list) + return process_systems([[systems]], report) + +def process_threats_from_api(assessment: dict, report: SpeciesReport) -> list: + """Extract and process threat data from API, applying STAR weighting.""" + threats_data = assessment.get('threats', []) + + # API uses underscores (e.g., "2_3_2") but we need dots (e.g., "2.3.2") for consistency with DB format + threats = [( + threat.get('code', '').replace('_', '.'), + threat.get('scope'), + threat.get('severity'), + ) for threat in threats_data] + + return process_threats(threats, report) + +def process_habitats_from_api(assessment: dict, report: SpeciesReport) -> set: + """Extract habitat codes from API response.""" + habitats_data = assessment.get('habitats', []) + # API uses underscores (e.g., "1_5") but we need dots (e.g., "1.5") for consistency with DB format + # Convert codes and join with pipe separator to match DB format + codes_list = [habitat.get('code', '').replace('_', '.') for habitat in habitats_data] + codes = [['|'.join(codes_list)]] + return process_habitats(codes, report) + +def get_elevation_from_api(assessment: dict, report: SpeciesReport) -> tuple: + """Extract elevation limits from API response.""" + supplementary_info = assessment.get('supplementary_info', {}) + + elevation_lower = supplementary_info.get('lower_elevation_limit') + elevation_upper = supplementary_info.get('upper_elevation_limit') + + if elevation_lower is not None or elevation_upper is not None: + report.has_elevation = True + + if elevation_lower is not None: + elevation_lower = int(elevation_lower) + if elevation_upper is not None: + elevation_upper = int(elevation_upper) + + return elevation_lower, elevation_upper + +def process_species( + token: str, + class_name: str, + output_directory_path: Path, + target_projection: Optional[str], + base_presence_filter: tuple[int, ...], + species_data: tuple, +) -> SpeciesReport: + """ + Process a single species, combining all its range geometries. + + This function: + 1. Queries the IUCN API for assessment data + 2. Determines correct presence filter based on possibly_extinct status + 3. Filters and unions all geometry polygons for the species + 4. Merges both data sources + 5. Saves the result as a GeoJSON file + """ + + id_no, scientific_name, species_gdf = species_data + + report = SpeciesReport(id_no, None, scientific_name) + + # Parse scientific name + parts = scientific_name.split() + genus_name = parts[0] + species_name = parts[1] if len(parts) > 1 else "" + + logger.info("Processing %s (id_no=%s)", scientific_name, id_no) + + # Query API for assessment data + try: + factory = redlistapi.AssessmentFactory(token) + assessment = factory.from_scientific_name(genus_name, species_name, scope='1') + report.has_api_data = True + except (requests.exceptions.RequestException, ValueError) as e: + # RequestException: Network errors, API errors (HTTPError from raise_for_status()) + # ValueError: 'Too many positive matches' or 'No match' from from_scientific_name() + logger.error("Failed to get API data for %s: %s", scientific_name, e) + return report + + # Get assessment data as pandas DataFrame and dict + try: + assessment_df = assessment.assessment_as_pandas() + # Note: redlistapi incorrectly types assessment as list, but it's actually a dict + assessment_dict: dict = assessment.assessment + except (KeyError, AttributeError, IndexError) as e: + # KeyError: Missing expected keys in API response + # AttributeError: Missing expected attributes in assessment object + # IndexError: Empty dataframe when accessing .iloc[0] + logger.error("Failed to extract assessment data for %s: %s", scientific_name, e) + return report + + # Extract assessment metadata + # pylint: disable=no-member + # (assessment_dict is incorrectly typed as list in redlistapi but is actually dict) + assessment_id = assessment_dict.get('assessment_id') + assessment_year = assessment_df['year_published'].iloc[0] + category = assessment_dict.get('red_list_category', {}).get('code') + family_name = assessment_dict.get('taxon', {}).get('family_name') + possibly_extinct = assessment_dict.get('possibly_extinct', False) + possibly_extinct_in_the_wild = assessment_dict.get('possibly_extinct_in_the_wild', False) + # pylint: enable=no-member + + report.assessment_id = assessment_id + + # From Chess STAR report: adjust presence filter for possibly extinct species + presence_filter = base_presence_filter + if possibly_extinct or possibly_extinct_in_the_wild: + presence_filter = presence_filter + (4,) + report.possibly_extinct = True + + # Only process species in threat categories + if category not in CATEGORY_WEIGHTS: + logger.debug("Dropping %s: category %s not in %s", scientific_name, category, list(CATEGORY_WEIGHTS.keys())) + return report + + # Process systems + try: + systems = process_systems_from_api(assessment_df, report) + except ValueError as exc: + logger.debug("Dropping %s: %s", scientific_name, exc) + return report + + # Process threats + threats = process_threats_from_api(assessment_dict, report) + if len(threats) == 0: + logger.debug("Dropping %s: no threats", scientific_name) + return report + + # Process habitats + habitats = process_habitats_from_api(assessment_dict, report) + + # Get elevation + elevation_lower, elevation_upper = get_elevation_from_api(assessment_dict, report) + + # Now filter and union geometries based on the correct presence filter + # Filter by presence codes (now that we know if species is possibly extinct) + filtered_gdf = species_gdf[species_gdf['presence'].isin(presence_filter)] + + if len(filtered_gdf) == 0: + logger.debug("Dropping %s: no geometries after presence filtering", scientific_name) + return report + + report.has_geometry = True + + # Union all geometries for this species + geometries = [] + for _, row in filtered_gdf.iterrows(): + geom = row.geometry + if geom is not None and not geom.is_empty: + if not geom.is_valid: + geom = geom.buffer(0) + geometries.append(geom) + + if len(geometries) == 0: + logger.debug("Dropping %s: no valid geometries", scientific_name) + return report + + # Union all geometries + if len(geometries) == 1: + geometry = geometries[0] + else: + geometry = shapely.union_all(geometries) + + # Create GeoDataFrame with all data + gdf = gpd.GeoDataFrame( + [[ + id_no, + assessment_id, + int(assessment_year) if assessment_year else None, + "all", # season + systems, + elevation_lower, + elevation_upper, + '|'.join(list(habitats)), + scientific_name, + family_name, + class_name, + json.dumps(threats), + category, + CATEGORY_WEIGHTS[category], + geometry + ]], + columns=COLUMNS, + crs='EPSG:4326' + ) + + # Save to file + tidy_reproject_save(gdf, report, output_directory_path, target_projection) + + return report + + +def extract_data_from_shapefile( + shapefile_path: Path, + class_name: str, + token: str, + excludes_path: Optional[Path], + output_directory_path: Path, + target_projection: Optional[str], + presence_filter: tuple[int, ...], + origin_filter: tuple[int, ...], +) -> None: + """ + Extract species data from IUCN shapefile(s) combined with API data. + + Args: + shapefile_path: Path to IUCN Red List shapefile or directory containing shapefiles + class_name: Taxonomic class (e.g., "MAMMALIA") + token: IUCN API token + excludes_path: Optional CSV of species IDs to exclude + output_directory_path: Where to save output files + target_projection: Target CRS for output + presence_filter: Presence codes to include (default: 1, 2) + origin_filter: Origin codes to include (default: 1, 2, 6) + """ + + # Check if path is a directory or a single shapefile + if shapefile_path.is_dir(): + # Find all .shp files in the directory + shapefiles = sorted(shapefile_path.glob("*.shp")) + if len(shapefiles) == 0: + raise ValueError(f"No shapefiles found in directory: {shapefile_path}") + + logger.info("Found %d shapefile(s) in %s", len(shapefiles), shapefile_path) + + # Load and concatenate all shapefiles + gdfs = [] + for shp_file in shapefiles: + logger.info("Reading shapefile: %s", shp_file.name) + gdf_part = gpd.read_file(shp_file) + logger.info(" Loaded %d rows from %s", len(gdf_part), shp_file.name) + gdfs.append(gdf_part) + + # Concatenate all dataframes + gdf = pd.concat(gdfs, ignore_index=True) + logger.info("Combined %d total rows from %d shapefile(s)", len(gdf), len(shapefiles)) + else: + # Single shapefile + logger.info("Reading shapefile: %s", shapefile_path) + gdf = gpd.read_file(shapefile_path) + logger.info("Loaded %d rows from shapefile", len(gdf)) + + logger.info("Columns: %s", gdf.columns.tolist()) + + # Normalise column names (shapefiles may have different conventions) + column_map = {} + for col in gdf.columns: + col_lower = col.lower() + if col_lower in ['binomial', 'sci_name', 'scientific_name']: + column_map[col] = 'binomial' + elif col_lower in ['id_no', 'sisid', 'sis_id']: + column_map[col] = 'id_no' + elif col_lower in ['presence', 'pres']: + column_map[col] = 'presence' + elif col_lower in ['origin', 'orig']: + column_map[col] = 'origin' + + gdf = gdf.rename(columns=column_map) + + # Filter by origin (but NOT presence yet - we need to check possibly_extinct first) + if 'origin' in gdf.columns: + before = len(gdf) + gdf = gdf[gdf['origin'].isin(origin_filter)] + logger.info("Filtered by origin %s: %d -> %d rows", origin_filter, before, len(gdf)) + else: + logger.warning("No 'origin' column found in shapefile - cannot filter by origin") + + # Check for presence column (needed later for per-species filtering) + if 'presence' not in gdf.columns: + logger.error("No 'presence' column found in shapefile - cannot filter by presence") + raise ValueError("Shapefile must have a 'presence' column") + + # Load excludes list + excludes = set() + if excludes_path and excludes_path.exists(): + try: + df = pd.read_csv(excludes_path) + excludes = set(df.id_no.unique()) + logger.info("Excluding %d species from %s", len(excludes), excludes_path) + except (pd.errors.ParserError, KeyError, ValueError) as e: + # ParserError: CSV parsing failed + # KeyError: 'id_no' column not found + # ValueError: Invalid data in id_no column + logger.warning("Could not load excludes file: %s", e) + + # Filter out excluded species + if excludes and 'id_no' in gdf.columns: + before = len(gdf) + gdf = gdf[~gdf['id_no'].isin(excludes)] + logger.info("After excluding species: %d -> %d shapes", before, len(gdf)) + + # Group by species + # The shapefile has multiple rows per species (one per range polygon) + # We'll pass all rows for each species to the processing function so it can + # apply the correct presence filter based on possibly_extinct status + logger.info("Grouping %d rows by species", len(gdf)) + + species_groups = [] + for id_no, group in gdf.groupby('id_no'): + # Get species name from first row (all rows for a species have the same name) + scientific_name = group.iloc[0]['binomial'] + + # Pass the entire group (all rows for this species) to the processing function + # The function will filter by presence and union geometries after checking possibly_extinct + species_groups.append((id_no, scientific_name, group)) + + logger.info("Grouped into %d unique species", len(species_groups)) + + # Process in current era only (not historic) + era = "current" + era_output_directory_path = output_directory_path / era + + logger.info("Processing %d species for %s in %s scenario", len(species_groups), class_name, era) + + # Process species - using multiprocessing + # Note: We use fewer processes than the DB version because we're making API calls + with Pool(processes=5) as pool: + reports = pool.map( + partial( + process_species, + token, + class_name, + era_output_directory_path, + target_projection, + presence_filter, + ), + species_groups + ) + + # Save report + reports_df = pd.DataFrame( + [x.as_row() for x in reports], + columns=SpeciesReport.REPORT_COLUMNS + ).sort_values('id_no') + + os.makedirs(era_output_directory_path, exist_ok=True) + reports_df.to_csv(era_output_directory_path / "report.csv", index=False) + + logger.info("Saved report to %s", era_output_directory_path / 'report.csv') + + # Print summary + total = len(reports) + with_files = reports_df['filename'].notna().sum() + logger.info("Successfully processed %d/%d species", with_files, total) + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Extract species data from IUCN Red List shapefile + API.", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=__doc__ + ) + parser.add_argument( + '--shapefile', + type=Path, + help="Path to IUCN Red List shapefile (e.g., MAMMALS.shp) or directory containing shapefiles", + required=True, + ) + parser.add_argument( + '--class', + type=str, + help="Species class name (e.g., MAMMALIA, AMPHIBIA, REPTILIA, AVES)", + required=True, + dest="classname", + ) + parser.add_argument( + '--excludes', + type=Path, + help="CSV of taxon IDs to exclude (SpeciesList_generalisedRangePolygons.csv)", + required=False, + dest="excludes" + ) + parser.add_argument( + '--output', + type=Path, + help='Directory where per species GeoJSON files will be stored', + required=True, + dest='output_directory_path', + ) + parser.add_argument( + '--projection', + type=str, + help="Target projection (default: ESRI:54009)", + required=False, + dest="target_projection", + default="ESRI:54009" + ) + parser.add_argument( + '--presence', + type=str, + help="Comma-separated presence codes to include (default: 1,2)", + default="1,2", + ) + parser.add_argument( + '--origin', + type=str, + help="Comma-separated origin codes to include (default: 1,2,6)", + default="1,2,6", + ) + + args = parser.parse_args() + + # Get API token + token = os.getenv('IUCN_REDLIST_TOKEN') + if not token: + print("ERROR: IUCN_REDLIST_TOKEN environment variable not set") + print("Get a token from: https://api.iucnredlist.org/users/sign_up") + print("Then set it with: export IUCN_REDLIST_TOKEN='your_token_here'") + sys.exit(1) + + # Parse presence and origin filters + presence_filter = tuple(int(x) for x in args.presence.split(',')) + origin_filter = tuple(int(x) for x in args.origin.split(',')) + + # Verify shapefile or directory exists + if not args.shapefile.exists(): + print(f"ERROR: Path not found: {args.shapefile}") + print("\nDownload shapefiles from: https://www.iucnredlist.org/resources/spatial-data-download") + sys.exit(1) + + if args.shapefile.is_dir(): + # Check that directory contains at least one .shp file + shapefiles = list(args.shapefile.glob("*.shp")) + if len(shapefiles) == 0: + print(f"ERROR: No .shp files found in directory: {args.shapefile}") + sys.exit(1) + + extract_data_from_shapefile( + args.shapefile, + args.classname, + token, + args.excludes, + args.output_directory_path, + args.target_projection, + presence_filter, + origin_filter, + ) + + +if __name__ == "__main__": + main() From 040403e17bc683467fd7891edc0dac2b499b9ceb Mon Sep 17 00:00:00 2001 From: Michael Dales Date: Mon, 20 Oct 2025 15:05:43 +0100 Subject: [PATCH 02/12] More tweaks to Redlist API script --- .../extract_species_data_redlist.py | 132 +++++++----------- 1 file changed, 49 insertions(+), 83 deletions(-) diff --git a/prepare_species/extract_species_data_redlist.py b/prepare_species/extract_species_data_redlist.py index 6927760..22279b7 100644 --- a/prepare_species/extract_species_data_redlist.py +++ b/prepare_species/extract_species_data_redlist.py @@ -16,7 +16,7 @@ * MAMMALS * AMPHIBIANS * REPTILES - * BIRDS (if available) + * BIRDS - Extract the shapefiles to a known location on your computer - Note: Each download contains a shapefile (.shp) and associated files (.dbf, .shx, .prj) @@ -81,15 +81,15 @@ logger.setLevel(logging.DEBUG) -def process_systems_from_api(assessment_df: pd.DataFrame, report: SpeciesReport) -> str: +def process_systems_from_api(assessment: dict, report: SpeciesReport) -> str: """Extract and validate systems data from API response.""" # The assessment_as_pandas() returns terrestrial, freshwater, marine as boolean columns systems_list = [] - if assessment_df['terrestrial'].iloc[0]: + if assessment['terrestrial']: systems_list.append("Terrestrial") - if assessment_df['freshwater'].iloc[0]: + if assessment['freshwater']: systems_list.append("Freshwater") - if assessment_df['marine'].iloc[0]: + if assessment['marine']: systems_list.append("Marine") systems = "|".join(systems_list) @@ -153,51 +153,40 @@ def process_species( 5. Saves the result as a GeoJSON file """ - id_no, scientific_name, species_gdf = species_data + id_no, species_gdf = species_data + scientific_name = species_gdf.iloc[0]['scientific_name'] report = SpeciesReport(id_no, None, scientific_name) - # Parse scientific name - parts = scientific_name.split() - genus_name = parts[0] - species_name = parts[1] if len(parts) > 1 else "" + logger.info("Processing %s (%s)", id_no, scientific_name) - logger.info("Processing %s (id_no=%s)", scientific_name, id_no) - - # Query API for assessment data + # Get assessment from API. The Redlistapi package returns the most recent assessment when you + # use `from_taxid` try: factory = redlistapi.AssessmentFactory(token) - assessment = factory.from_scientific_name(genus_name, species_name, scope='1') + assessment = factory.from_taxid(id_no) report.has_api_data = True except (requests.exceptions.RequestException, ValueError) as e: - # RequestException: Network errors, API errors (HTTPError from raise_for_status()) - # ValueError: 'Too many positive matches' or 'No match' from from_scientific_name() logger.error("Failed to get API data for %s: %s", scientific_name, e) return report - # Get assessment data as pandas DataFrame and dict + # Whilst you can do `assessment.assessment` to get the original data as a dict, + # there is a bunch of data cleaning that is done as part of `assessment_as_pandas` which + # is nice to have, so we call that and then covert it back to a dict for Python + # convenience. + assessment_dict = assessment.assessment_as_pandas().to_dict(orient='records')[0] + try: - assessment_df = assessment.assessment_as_pandas() - # Note: redlistapi incorrectly types assessment as list, but it's actually a dict - assessment_dict: dict = assessment.assessment - except (KeyError, AttributeError, IndexError) as e: - # KeyError: Missing expected keys in API response - # AttributeError: Missing expected attributes in assessment object - # IndexError: Empty dataframe when accessing .iloc[0] - logger.error("Failed to extract assessment data for %s: %s", scientific_name, e) + assessment_id = assessment_dict['assid'] + assessment_year = assessment_dict['assessment_date'].year + category = assessment_dict['red_list_category'] + family_name = assessment_dict['family_name'] + possibly_extinct = assessment_dict['possibly_extinct'] + possibly_extinct_in_the_wild = assessment_dict['possibly_extinct_in_the_wild'] + except KeyError as exc: + logger.error("Failed to get data from assessment record for %s: %s", id_no, exc) return report - # Extract assessment metadata - # pylint: disable=no-member - # (assessment_dict is incorrectly typed as list in redlistapi but is actually dict) - assessment_id = assessment_dict.get('assessment_id') - assessment_year = assessment_df['year_published'].iloc[0] - category = assessment_dict.get('red_list_category', {}).get('code') - family_name = assessment_dict.get('taxon', {}).get('family_name') - possibly_extinct = assessment_dict.get('possibly_extinct', False) - possibly_extinct_in_the_wild = assessment_dict.get('possibly_extinct_in_the_wild', False) - # pylint: enable=no-member - report.assessment_id = assessment_id # From Chess STAR report: adjust presence filter for possibly extinct species @@ -208,20 +197,20 @@ def process_species( # Only process species in threat categories if category not in CATEGORY_WEIGHTS: - logger.debug("Dropping %s: category %s not in %s", scientific_name, category, list(CATEGORY_WEIGHTS.keys())) + logger.debug("Dropping %s: category %s not in %s", id_no, category, list(CATEGORY_WEIGHTS.keys())) return report # Process systems try: - systems = process_systems_from_api(assessment_df, report) + systems = process_systems_from_api(assessment_dict, report) except ValueError as exc: - logger.debug("Dropping %s: %s", scientific_name, exc) + logger.debug("Dropping %s: %s", id_no, exc) return report # Process threats threats = process_threats_from_api(assessment_dict, report) if len(threats) == 0: - logger.debug("Dropping %s: no threats", scientific_name) + logger.debug("Dropping %s: no threats", id_no) return report # Process habitats @@ -235,12 +224,9 @@ def process_species( filtered_gdf = species_gdf[species_gdf['presence'].isin(presence_filter)] if len(filtered_gdf) == 0: - logger.debug("Dropping %s: no geometries after presence filtering", scientific_name) + logger.debug("Dropping %s: no geometries after presence filtering", id_no) return report - report.has_geometry = True - - # Union all geometries for this species geometries = [] for _, row in filtered_gdf.iterrows(): geom = row.geometry @@ -250,9 +236,11 @@ def process_species( geometries.append(geom) if len(geometries) == 0: - logger.debug("Dropping %s: no valid geometries", scientific_name) + logger.debug("Dropping %s: no valid geometries", id_no) return report + report.has_geometry = True + # Union all geometries if len(geometries) == 1: geometry = geometries[0] @@ -312,40 +300,31 @@ def extract_data_from_shapefile( origin_filter: Origin codes to include (default: 1, 2, 6) """ - # Check if path is a directory or a single shapefile if shapefile_path.is_dir(): - # Find all .shp files in the directory - shapefiles = sorted(shapefile_path.glob("*.shp")) + shapefiles = list(shapefile_path.glob("*.shp")) if len(shapefiles) == 0: raise ValueError(f"No shapefiles found in directory: {shapefile_path}") logger.info("Found %d shapefile(s) in %s", len(shapefiles), shapefile_path) - # Load and concatenate all shapefiles gdfs = [] for shp_file in shapefiles: - logger.info("Reading shapefile: %s", shp_file.name) gdf_part = gpd.read_file(shp_file) - logger.info(" Loaded %d rows from %s", len(gdf_part), shp_file.name) gdfs.append(gdf_part) - # Concatenate all dataframes gdf = pd.concat(gdfs, ignore_index=True) logger.info("Combined %d total rows from %d shapefile(s)", len(gdf), len(shapefiles)) else: - # Single shapefile - logger.info("Reading shapefile: %s", shapefile_path) gdf = gpd.read_file(shapefile_path) logger.info("Loaded %d rows from shapefile", len(gdf)) - logger.info("Columns: %s", gdf.columns.tolist()) # Normalise column names (shapefiles may have different conventions) column_map = {} for col in gdf.columns: col_lower = col.lower() if col_lower in ['binomial', 'sci_name', 'scientific_name']: - column_map[col] = 'binomial' + column_map[col] = 'scientific_name' elif col_lower in ['id_no', 'sisid', 'sis_id']: column_map[col] = 'id_no' elif col_lower in ['presence', 'pres']: @@ -361,28 +340,21 @@ def extract_data_from_shapefile( gdf = gdf[gdf['origin'].isin(origin_filter)] logger.info("Filtered by origin %s: %d -> %d rows", origin_filter, before, len(gdf)) else: - logger.warning("No 'origin' column found in shapefile - cannot filter by origin") + raise ValueError("No 'origin' column found in shapefile - cannot filter by origin") - # Check for presence column (needed later for per-species filtering) if 'presence' not in gdf.columns: - logger.error("No 'presence' column found in shapefile - cannot filter by presence") raise ValueError("Shapefile must have a 'presence' column") - # Load excludes list excludes = set() - if excludes_path and excludes_path.exists(): + if excludes_path: try: df = pd.read_csv(excludes_path) excludes = set(df.id_no.unique()) logger.info("Excluding %d species from %s", len(excludes), excludes_path) - except (pd.errors.ParserError, KeyError, ValueError) as e: - # ParserError: CSV parsing failed - # KeyError: 'id_no' column not found - # ValueError: Invalid data in id_no column - logger.warning("Could not load excludes file: %s", e) - - # Filter out excluded species - if excludes and 'id_no' in gdf.columns: + except (FileDoesNotExist, pd.errors.ParserError, KeyError, ValueError) as e: + raise ValueError("Could not load excludes file: %s", e) + + if excludes: before = len(gdf) gdf = gdf[~gdf['id_no'].isin(excludes)] logger.info("After excluding species: %d -> %d shapes", before, len(gdf)) @@ -391,20 +363,12 @@ def extract_data_from_shapefile( # The shapefile has multiple rows per species (one per range polygon) # We'll pass all rows for each species to the processing function so it can # apply the correct presence filter based on possibly_extinct status - logger.info("Grouping %d rows by species", len(gdf)) - species_groups = [] for id_no, group in gdf.groupby('id_no'): - # Get species name from first row (all rows for a species have the same name) - scientific_name = group.iloc[0]['binomial'] - - # Pass the entire group (all rows for this species) to the processing function - # The function will filter by presence and union geometries after checking possibly_extinct - species_groups.append((id_no, scientific_name, group)) + species_groups.append((id_no, group)) logger.info("Grouped into %d unique species", len(species_groups)) - # Process in current era only (not historic) era = "current" era_output_directory_path = output_directory_path / era @@ -424,8 +388,11 @@ def extract_data_from_shapefile( ), species_groups ) + # reports = [ + # process_species(token, class_name, era_output_directory_path, target_projection, presence_filter, species) + # for species in species_groups + # ] - # Save report reports_df = pd.DataFrame( [x.as_row() for x in reports], columns=SpeciesReport.REPORT_COLUMNS @@ -436,7 +403,6 @@ def extract_data_from_shapefile( logger.info("Saved report to %s", era_output_directory_path / 'report.csv') - # Print summary total = len(reports) with_files = reports_df['filename'].notna().sum() logger.info("Successfully processed %d/%d species", with_files, total) @@ -499,11 +465,11 @@ def main() -> None: args = parser.parse_args() # Get API token - token = os.getenv('IUCN_REDLIST_TOKEN') + token = os.getenv('REDLIST_API_TOKEN') if not token: - print("ERROR: IUCN_REDLIST_TOKEN environment variable not set") + print("ERROR: REDLIST_API_TOKEN environment variable not set") print("Get a token from: https://api.iucnredlist.org/users/sign_up") - print("Then set it with: export IUCN_REDLIST_TOKEN='your_token_here'") + print("Then set it with: export REDLIST_API_TOKEN='your_token_here'") sys.exit(1) # Parse presence and origin filters From 9593ce69c4948022da94cc450756a934683c943e Mon Sep 17 00:00:00 2001 From: Michael Dales Date: Mon, 20 Oct 2025 15:10:39 +0100 Subject: [PATCH 03/12] Add redlistapi dependancy --- requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements.txt b/requirements.txt index 720c71a..9bb9be7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,6 +6,7 @@ psutil pymer4 pyproj scikit-image +redlistapi yirgacheffe>=1.9 aoh[validation]>=1.0 From eb57fbaed10977645ab7192b3291e14caaf8b9c9 Mon Sep 17 00:00:00 2001 From: Michael Dales Date: Mon, 20 Oct 2025 15:33:30 +0100 Subject: [PATCH 04/12] Workflow to use Python 3.12 for redlistapi compatibility --- .github/workflows/python-package.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 5724e1c..f1baeec 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -16,7 +16,7 @@ jobs: strategy: fail-fast: false matrix: - python-version: ["3.10"] + python-version: ["3.12"] steps: - name: Install system From eb1085445f376c1bdb4ac46a840c91cc34d55596 Mon Sep 17 00:00:00 2001 From: Michael Dales Date: Mon, 20 Oct 2025 15:49:53 +0100 Subject: [PATCH 05/12] Fix linter issues --- prepare_species/extract_species_data_redlist.py | 8 ++++---- requirements.txt | 1 + 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/prepare_species/extract_species_data_redlist.py b/prepare_species/extract_species_data_redlist.py index 22279b7..eef5ffc 100644 --- a/prepare_species/extract_species_data_redlist.py +++ b/prepare_species/extract_species_data_redlist.py @@ -81,7 +81,7 @@ logger.setLevel(logging.DEBUG) -def process_systems_from_api(assessment: dict, report: SpeciesReport) -> str: +def process_systems_from_api(assessment: dict, report: SpeciesReport) -> list: """Extract and validate systems data from API response.""" # The assessment_as_pandas() returns terrestrial, freshwater, marine as boolean columns systems_list = [] @@ -93,7 +93,7 @@ def process_systems_from_api(assessment: dict, report: SpeciesReport) -> str: systems_list.append("Marine") systems = "|".join(systems_list) - return process_systems([[systems]], report) + return process_systems([(systems,)], report) def process_threats_from_api(assessment: dict, report: SpeciesReport) -> list: """Extract and process threat data from API, applying STAR weighting.""" @@ -351,8 +351,8 @@ def extract_data_from_shapefile( df = pd.read_csv(excludes_path) excludes = set(df.id_no.unique()) logger.info("Excluding %d species from %s", len(excludes), excludes_path) - except (FileDoesNotExist, pd.errors.ParserError, KeyError, ValueError) as e: - raise ValueError("Could not load excludes file: %s", e) + except (FileNotFoundError, pd.errors.ParserError, KeyError, ValueError) as exc: + raise ValueError(f"Could not load excludes file: {excludes_path}") from exc if excludes: before = len(gdf) diff --git a/requirements.txt b/requirements.txt index 9bb9be7..8077573 100644 --- a/requirements.txt +++ b/requirements.txt @@ -16,3 +16,4 @@ gdal[numpy] pylint mypy pytest +types-requests From 7fca4789d503cc0cf91459e6438fa3904fd505f9 Mon Sep 17 00:00:00 2001 From: Michael Dales Date: Wed, 22 Oct 2025 07:31:06 +0100 Subject: [PATCH 06/12] More refactoring and get tests working again --- README.md | 5 +- prepare_species/common.py | 32 ++++++++++ prepare_species/extract_species_data_psql.py | 32 +--------- .../extract_species_data_redlist.py | 61 ++++++++----------- tests/test_species_filter.py | 2 +- 5 files changed, 59 insertions(+), 73 deletions(-) diff --git a/README.md b/README.md index bc552b6..b5b02e2 100644 --- a/README.md +++ b/README.md @@ -29,7 +29,6 @@ There are two ways to run the pipeline. The easiest way is to use Docker if you ### Running with Docker - There is included a docker file, which is based on the GDAL container image, which is set up to install everything ready to use. You can build that using: ```shell @@ -64,9 +63,7 @@ With those you should set up a Python virtual environment to install all the req ```shell $ python3 -m venv ./venv $ . ./venv/bin/activate -(venv) $ gdalinfo --version -GDAL 3.11.3 "Eganville", released 2025/07/12 -(venv) $ pip install gdal[numpy]==3.11.3 +(venv) $ pip install gdal[numpy]==`gdal-config --version` ... (venv) $ pip install -r requirements.txt ``` diff --git a/prepare_species/common.py b/prepare_species/common.py index 97586d7..91e400b 100644 --- a/prepare_species/common.py +++ b/prepare_species/common.py @@ -5,6 +5,7 @@ import aoh import geopandas as gpd import pyproj +import shapely # To match the FABDEM elevation map we use # different range min/max/separation @@ -110,6 +111,37 @@ def __getattr__(self, name: str) -> Any: def as_row(self) -> list: return [self.info[k] for k in self.REPORT_COLUMNS] +def process_geometries( + geometries_data: list[tuple[shapely.Geometry]], + report: SpeciesReport, +) -> shapely.Geometry: + if len(geometries_data) == 0: + raise ValueError("No geometries in DB") + report.has_geometries = True + + geometry = None + for geometry_row in geometries_data: + assert len(geometry_row) == 1 + row_geometry = geometry_row[0] + if row_geometry is None: + continue + + grange = shapely.normalize(shapely.from_wkb(row_geometry.to_ewkb())) + if grange.area == 0.0: + continue + + if geometry is None: + geometry = grange + else: + geometry = shapely.union(geometry, grange) + + if geometry is None: + raise ValueError("None geometry data in DB") + report.keeps_geometries = True + + return geometry + + def process_systems( systems_data: list[tuple], report: SpeciesReport, diff --git a/prepare_species/extract_species_data_psql.py b/prepare_species/extract_species_data_psql.py index a437df2..4044ce4 100644 --- a/prepare_species/extract_species_data_psql.py +++ b/prepare_species/extract_species_data_psql.py @@ -12,7 +12,6 @@ import geopandas as gpd import pandas as pd import psycopg2 -import shapely from postgis.psycopg import register from common import ( @@ -21,6 +20,7 @@ process_habitats, process_threats, process_systems, + process_geometries, tidy_reproject_save, SpeciesReport, ) @@ -114,36 +114,6 @@ f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}" ) -def process_geometries( - geometries_data: list[tuple[int, shapely.Geometry]], - report: SpeciesReport, -) -> shapely.Geometry: - if len(geometries_data) == 0: - raise ValueError("No geometries in DB") - report.has_geometries = True - - geometry = None - for geometry_row in geometries_data: - assert len(geometry_row) == 1 - row_geometry = geometry_row[0] - if row_geometry is None: - continue - - grange = shapely.normalize(shapely.from_wkb(row_geometry.to_ewkb())) - if grange.area == 0.0: - continue - - if geometry is None: - geometry = grange - else: - geometry = shapely.union(geometry, grange) - - if geometry is None: - raise ValueError("None geometry data in DB") - report.keeps_geometries = True - - return geometry - def process_row( class_name: str, output_directory_path: Path, diff --git a/prepare_species/extract_species_data_redlist.py b/prepare_species/extract_species_data_redlist.py index eef5ffc..3eb8861 100644 --- a/prepare_species/extract_species_data_redlist.py +++ b/prepare_species/extract_species_data_redlist.py @@ -10,22 +10,22 @@ ============== 1. Download IUCN Red List Spatial Data: - - Go to: https://www.iucnredlist.org/resources/spatial-data-download - - Create an account (free for non-commercial use) - - Download the shapefile for your target taxonomic group(s): - * MAMMALS - * AMPHIBIANS - * REPTILES - * BIRDS - - Extract the shapefiles to a known location on your computer - - Note: Each download contains a shapefile (.shp) and associated files (.dbf, .shx, .prj) + - Go to: https://www.iucnredlist.org/resources/spatial-data-download + - Create an account (free for non-commercial use) + - Download the shapefile for your target taxonomic group(s): + * MAMMALS + * AMPHIBIANS + * REPTILES + * BIRDS + - Extract the shapefiles to a known location on your computer + - Note: Each download contains a shapefile (.shp) and associated files (.dbf, .shx, .prj) 2. Get an IUCN Red List API Token: - - Go to: https://api.iucnredlist.org/users/sign_up - - Sign up for a free API account - - You'll receive an API token via email - - Set the token as an environment variable: - export IUCN_REDLIST_TOKEN="your_token_here" + - Go to: https://api.iucnredlist.org/users/sign_up + - Sign up for a free API account + - You'll receive an API token via email + - Set the token as an environment variable: + export IUCN_REDLIST_TOKEN="your_token_here" USAGE: ====== @@ -70,6 +70,7 @@ CATEGORY_WEIGHTS, COLUMNS, process_habitats, + process_geometries, process_threats, process_systems, tidy_reproject_save, @@ -117,6 +118,14 @@ def process_habitats_from_api(assessment: dict, report: SpeciesReport) -> set: codes = [['|'.join(codes_list)]] return process_habitats(codes, report) +def process_geometries_from_api(geometries: pd.DataFrame, report: SpeciesReport) -> shapely.Geometry: + # Reworking data to look like database data for common code + reformated_geometries = [] + for _, row in geometries.iterrows(): + geom = row.geometry + reformated_geometries.append((geom,)) + return process_geometries(reformated_geometries, report) + def get_elevation_from_api(assessment: dict, report: SpeciesReport) -> tuple: """Extract elevation limits from API response.""" supplementary_info = assessment.get('supplementary_info', {}) @@ -223,29 +232,7 @@ def process_species( # Filter by presence codes (now that we know if species is possibly extinct) filtered_gdf = species_gdf[species_gdf['presence'].isin(presence_filter)] - if len(filtered_gdf) == 0: - logger.debug("Dropping %s: no geometries after presence filtering", id_no) - return report - - geometries = [] - for _, row in filtered_gdf.iterrows(): - geom = row.geometry - if geom is not None and not geom.is_empty: - if not geom.is_valid: - geom = geom.buffer(0) - geometries.append(geom) - - if len(geometries) == 0: - logger.debug("Dropping %s: no valid geometries", id_no) - return report - - report.has_geometry = True - - # Union all geometries - if len(geometries) == 1: - geometry = geometries[0] - else: - geometry = shapely.union_all(geometries) + geometry = process_geometries_from_api(filtered_gdf, report) # Create GeoDataFrame with all data gdf = gpd.GeoDataFrame( diff --git a/tests/test_species_filter.py b/tests/test_species_filter.py index b00a538..d7fb9fe 100644 --- a/tests/test_species_filter.py +++ b/tests/test_species_filter.py @@ -2,7 +2,7 @@ import postgis import shapely -from prepare_species.extract_species_data_psql import process_habitats, process_geometries, \ +from prepare_species.common import process_habitats, process_geometries, \ process_threats, SpeciesReport def test_empty_report() -> None: From 8bec27f8274d591984955bb3c7c789d1b089eae6 Mon Sep 17 00:00:00 2001 From: Michael Dales Date: Thu, 23 Oct 2025 07:29:32 +0100 Subject: [PATCH 07/12] Rework how we use redlistapi a little --- prepare_species/common.py | 12 +-- prepare_species/extract_species_data_psql.py | 7 +- .../extract_species_data_redlist.py | 83 +++++++++---------- 3 files changed, 49 insertions(+), 53 deletions(-) diff --git a/prepare_species/common.py b/prepare_species/common.py index 91e400b..a543c60 100644 --- a/prepare_species/common.py +++ b/prepare_species/common.py @@ -75,6 +75,7 @@ class SpeciesReport: "id_no", "assessment_id", "scientific_name", + "has_api_data", "possibly_extinct", "has_systems", "not_terrestrial_system", @@ -112,7 +113,7 @@ def as_row(self) -> list: return [self.info[k] for k in self.REPORT_COLUMNS] def process_geometries( - geometries_data: list[tuple[shapely.Geometry]], + geometries_data: list[shapely.Geometry], report: SpeciesReport, ) -> shapely.Geometry: if len(geometries_data) == 0: @@ -120,13 +121,8 @@ def process_geometries( report.has_geometries = True geometry = None - for geometry_row in geometries_data: - assert len(geometry_row) == 1 - row_geometry = geometry_row[0] - if row_geometry is None: - continue - - grange = shapely.normalize(shapely.from_wkb(row_geometry.to_ewkb())) + for geometry in geometries_data: + grange = shapely.normalize(geometry) if grange.area == 0.0: continue diff --git a/prepare_species/extract_species_data_psql.py b/prepare_species/extract_species_data_psql.py index 4044ce4..244cbea 100644 --- a/prepare_species/extract_species_data_psql.py +++ b/prepare_species/extract_species_data_psql.py @@ -130,6 +130,7 @@ def process_row( elevation_lower, elevation_upper, scientific_name, family_name, category = row report = SpeciesReport(id_no, assessment_id, scientific_name) + report.has_api_data = True # From Chess STAR report if possibly_extinct or possibly_extinct_in_the_wild: @@ -159,8 +160,12 @@ def process_row( cursor.execute(GEOMETRY_STATEMENT, (assessment_id, presence)) geometries_data = cursor.fetchall() + cleaned_geometries = [ + shapely.from_wkb(row_geometry[0].to_ewkb() + for row_geometry in geometries if row_geometry[0] is not None + ] try: - geometry = process_geometries(geometries_data, report) + geometry = process_geometries(cleaned_geometries, report) except ValueError as _exc: return report diff --git a/prepare_species/extract_species_data_redlist.py b/prepare_species/extract_species_data_redlist.py index 3eb8861..51b2e89 100644 --- a/prepare_species/extract_species_data_redlist.py +++ b/prepare_species/extract_species_data_redlist.py @@ -55,8 +55,6 @@ import logging import os import sys -from functools import partial -from multiprocessing import Pool from pathlib import Path from typing import Optional @@ -82,23 +80,15 @@ logger.setLevel(logging.DEBUG) -def process_systems_from_api(assessment: dict, report: SpeciesReport) -> list: +def process_systems_from_api(assessment: redlistapi.Assessment, report: SpeciesReport) -> list: """Extract and validate systems data from API response.""" - # The assessment_as_pandas() returns terrestrial, freshwater, marine as boolean columns - systems_list = [] - if assessment['terrestrial']: - systems_list.append("Terrestrial") - if assessment['freshwater']: - systems_list.append("Freshwater") - if assessment['marine']: - systems_list.append("Marine") - - systems = "|".join(systems_list) + systems_data = assessment.systems_as_pandas().to_dict(orient='records') + systems = "|".join([x['description'] for x in systems_data]) return process_systems([(systems,)], report) -def process_threats_from_api(assessment: dict, report: SpeciesReport) -> list: +def process_threats_from_api(assessment: redlistapi.Assessment, report: SpeciesReport) -> list: """Extract and process threat data from API, applying STAR weighting.""" - threats_data = assessment.get('threats', []) + threats_data = assessment.threats_as_pandas().to_dict(orient='records') # API uses underscores (e.g., "2_3_2") but we need dots (e.g., "2.3.2") for consistency with DB format threats = [( @@ -109,9 +99,8 @@ def process_threats_from_api(assessment: dict, report: SpeciesReport) -> list: return process_threats(threats, report) -def process_habitats_from_api(assessment: dict, report: SpeciesReport) -> set: - """Extract habitat codes from API response.""" - habitats_data = assessment.get('habitats', []) +def process_habitats_from_api(assessment: redlistapi.Assessment, report: SpeciesReport) -> set: + habitats_data = assessment.habitats_as_pandas().to_dict(orient='records') # API uses underscores (e.g., "1_5") but we need dots (e.g., "1.5") for consistency with DB format # Convert codes and join with pipe separator to match DB format codes_list = [habitat.get('code', '').replace('_', '.') for habitat in habitats_data] @@ -123,7 +112,7 @@ def process_geometries_from_api(geometries: pd.DataFrame, report: SpeciesReport) reformated_geometries = [] for _, row in geometries.iterrows(): geom = row.geometry - reformated_geometries.append((geom,)) + reformated_geometries.append(geom) return process_geometries(reformated_geometries, report) def get_elevation_from_api(assessment: dict, report: SpeciesReport) -> tuple: @@ -173,10 +162,14 @@ def process_species( # use `from_taxid` try: factory = redlistapi.AssessmentFactory(token) - assessment = factory.from_taxid(id_no) + assessment = factory.from_taxid(id_no, scope='1') report.has_api_data = True - except (requests.exceptions.RequestException, ValueError) as e: - logger.error("Failed to get API data for %s: %s", scientific_name, e) + except (ValueError) as exc: + logger.error("Failed to get API data for %s: %s", scientific_name, exc) + raise + return report + except (requests.exceptions.RequestException) as exc: + logger.error("Netowrk error: %s", scientific_name, exc) return report # Whilst you can do `assessment.assessment` to get the original data as a dict, @@ -192,10 +185,18 @@ def process_species( family_name = assessment_dict['family_name'] possibly_extinct = assessment_dict['possibly_extinct'] possibly_extinct_in_the_wild = assessment_dict['possibly_extinct_in_the_wild'] + infrarank = assessment_dict['infrarank'] except KeyError as exc: logger.error("Failed to get data from assessment record for %s: %s", id_no, exc) return report + # Remove subspecies and non-global assessments. This happens in the original SQL in the Postgres + # version. Note that the "True" is because the redlistapi package casts this to a string with + # the values of "True", "False", and "None". + if infrarank == "True": + logger.debug("Dropping %s: infrarank not none: %s", id_no, infrarank, infrarank.__class__) + return report + report.assessment_id = assessment_id # From Chess STAR report: adjust presence filter for possibly extinct species @@ -211,19 +212,23 @@ def process_species( # Process systems try: - systems = process_systems_from_api(assessment_dict, report) + systems = process_systems_from_api(assessment, report) except ValueError as exc: logger.debug("Dropping %s: %s", id_no, exc) return report # Process threats - threats = process_threats_from_api(assessment_dict, report) + threats = process_threats_from_api(assessment, report) if len(threats) == 0: logger.debug("Dropping %s: no threats", id_no) return report # Process habitats - habitats = process_habitats_from_api(assessment_dict, report) + try: + habitats = process_habitats_from_api(assessment, report) + except ValueError as exc: + logger.debug("Dropping %s: %s", id_no, exc) + return report # Get elevation elevation_lower, elevation_upper = get_elevation_from_api(assessment_dict, report) @@ -232,7 +237,11 @@ def process_species( # Filter by presence codes (now that we know if species is possibly extinct) filtered_gdf = species_gdf[species_gdf['presence'].isin(presence_filter)] - geometry = process_geometries_from_api(filtered_gdf, report) + try: + geometry = process_geometries_from_api(filtered_gdf, report) + except ValueError as exc: + logger.debug("Dropping %s: %s", id_no, exc) + return report # Create GeoDataFrame with all data gdf = gpd.GeoDataFrame( @@ -361,24 +370,10 @@ def extract_data_from_shapefile( logger.info("Processing %d species for %s in %s scenario", len(species_groups), class_name, era) - # Process species - using multiprocessing - # Note: We use fewer processes than the DB version because we're making API calls - with Pool(processes=5) as pool: - reports = pool.map( - partial( - process_species, - token, - class_name, - era_output_directory_path, - target_projection, - presence_filter, - ), - species_groups - ) - # reports = [ - # process_species(token, class_name, era_output_directory_path, target_projection, presence_filter, species) - # for species in species_groups - # ] + reports = [ + process_species(token, class_name, era_output_directory_path, target_projection, presence_filter, species) + for species in species_groups + ] reports_df = pd.DataFrame( [x.as_row() for x in reports], From 15dcbb0d6a9cb27dc9f7eddc5a6fe6700e20129c Mon Sep 17 00:00:00 2001 From: Michael Dales Date: Fri, 24 Oct 2025 13:12:29 +0100 Subject: [PATCH 08/12] Add rate limit --- .../extract_species_data_redlist.py | 28 +++++++++++++++---- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/prepare_species/extract_species_data_redlist.py b/prepare_species/extract_species_data_redlist.py index 51b2e89..7e3180b 100644 --- a/prepare_species/extract_species_data_redlist.py +++ b/prepare_species/extract_species_data_redlist.py @@ -55,6 +55,7 @@ import logging import os import sys +import time from pathlib import Path from typing import Optional @@ -95,7 +96,7 @@ def process_threats_from_api(assessment: redlistapi.Assessment, report: SpeciesR threat.get('code', '').replace('_', '.'), threat.get('scope'), threat.get('severity'), - ) for threat in threats_data] + ) for threat in threats_data if threat.get('timing') != 'Past, Unlikely to Return'] return process_threats(threats, report) @@ -281,6 +282,7 @@ def extract_data_from_shapefile( target_projection: Optional[str], presence_filter: tuple[int, ...], origin_filter: tuple[int, ...], + rate_limit: float, ) -> None: """ Extract species data from IUCN shapefile(s) combined with API data. @@ -370,10 +372,19 @@ def extract_data_from_shapefile( logger.info("Processing %d species for %s in %s scenario", len(species_groups), class_name, era) - reports = [ - process_species(token, class_name, era_output_directory_path, target_projection, presence_filter, species) - for species in species_groups - ] + reports = [] + last = 0.0 + for species in species_groups: + + # The IUCN Redlist API v4 us currently rate limited to 120 requests per minute + now = time.time() + time_since_last_call = now - last + if time_since_last_call < rate_limit: + time.sleep(rate_limit - time_since_last_call) + last = now + + result = process_species(token, class_name, era_output_directory_path, target_projection, presence_filter, species) + reports.append(result) reports_df = pd.DataFrame( [x.as_row() for x in reports], @@ -443,6 +454,12 @@ def main() -> None: help="Comma-separated origin codes to include (default: 1,2,6)", default="1,2,6", ) + parser.add_argument( + '--rate_limit', + type=float, + help="Time between Redlist API requests (default: 0.5)", + default=0.5, + ) args = parser.parse_args() @@ -480,6 +497,7 @@ def main() -> None: args.target_projection, presence_filter, origin_filter, + args.rate_limit, ) From 733dbe11b3548442c5690d9785b24b4df4343a47 Mon Sep 17 00:00:00 2001 From: Michael Dales Date: Fri, 24 Oct 2025 13:24:52 +0100 Subject: [PATCH 09/12] Fix linter issues --- README.md | 7 +++++++ prepare_species/extract_species_data_psql.py | 2 +- prepare_species/extract_species_data_redlist.py | 14 ++++++++++---- 3 files changed, 18 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index b5b02e2..2974a93 100644 --- a/README.md +++ b/README.md @@ -23,6 +23,13 @@ There are some additional inputs required to run the pipeline, which should be p The script also assumes you have a Postgres database with the IUCN Redlist database in it. + +## Species data acquisition + +There are two scripts for getting the species data from the Redlist. For those in the IUCN with access to the database version of the redlist, use `extract_species_data_psql.py`. + +For those outside the IUCN, there is a script called `extract_species_data_redlist.py` that gets the data via the [V4 Redlist API](https://api.iucnredlist.org). You will need an API key, which you can request via the API website [by signing up](https://api.iucnredlist.org/users/sign_up). Once you have that, you still still need to download the ranges for that taxa your interested, as those are not available from the API, so before running the script you must [go to the spacial data portal](https://www.iucnredlist.org/resources/spatial-data-download) and download the files for the TAXA you are interested in. + ## Running the pipeline There are two ways to run the pipeline. The easiest way is to use Docker if you have it available to you, as it will manage all the dependencies for you. But you can check out and run it locally if you want to also, but it requires a little more effort. diff --git a/prepare_species/extract_species_data_psql.py b/prepare_species/extract_species_data_psql.py index 244cbea..559161c 100644 --- a/prepare_species/extract_species_data_psql.py +++ b/prepare_species/extract_species_data_psql.py @@ -161,7 +161,7 @@ def process_row( cursor.execute(GEOMETRY_STATEMENT, (assessment_id, presence)) geometries_data = cursor.fetchall() cleaned_geometries = [ - shapely.from_wkb(row_geometry[0].to_ewkb() + shapely.from_wkb(row_geometry[0].to_ewkb()) for row_geometry in geometries if row_geometry[0] is not None ] try: diff --git a/prepare_species/extract_species_data_redlist.py b/prepare_species/extract_species_data_redlist.py index 7e3180b..fc70241 100644 --- a/prepare_species/extract_species_data_redlist.py +++ b/prepare_species/extract_species_data_redlist.py @@ -166,11 +166,10 @@ def process_species( assessment = factory.from_taxid(id_no, scope='1') report.has_api_data = True except (ValueError) as exc: - logger.error("Failed to get API data for %s: %s", scientific_name, exc) - raise + logger.error("Failed to get API data for %s: %s", id_no, exc) return report except (requests.exceptions.RequestException) as exc: - logger.error("Netowrk error: %s", scientific_name, exc) + logger.error("Network error for %s: %s", id_no, exc) return report # Whilst you can do `assessment.assessment` to get the original data as a dict, @@ -383,7 +382,14 @@ def extract_data_from_shapefile( time.sleep(rate_limit - time_since_last_call) last = now - result = process_species(token, class_name, era_output_directory_path, target_projection, presence_filter, species) + result = process_species( + token, + class_name, + era_output_directory_path, + target_projection, + presence_filter, + species + ) reports.append(result) reports_df = pd.DataFrame( From 0812f21801f72321c645c0def4f3be53dab249f2 Mon Sep 17 00:00:00 2001 From: Michael Dales Date: Fri, 24 Oct 2025 13:28:32 +0100 Subject: [PATCH 10/12] Fix psql script --- prepare_species/extract_species_data_psql.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/prepare_species/extract_species_data_psql.py b/prepare_species/extract_species_data_psql.py index 559161c..c282028 100644 --- a/prepare_species/extract_species_data_psql.py +++ b/prepare_species/extract_species_data_psql.py @@ -12,6 +12,7 @@ import geopandas as gpd import pandas as pd import psycopg2 +import shapely from postgis.psycopg import register from common import ( @@ -162,7 +163,7 @@ def process_row( geometries_data = cursor.fetchall() cleaned_geometries = [ shapely.from_wkb(row_geometry[0].to_ewkb()) - for row_geometry in geometries if row_geometry[0] is not None + for row_geometry in geometries_data if row_geometry[0] is not None ] try: geometry = process_geometries(cleaned_geometries, report) From 2ab33a28381d4a8acc4ce69088f226851436b0be Mon Sep 17 00:00:00 2001 From: Michael Dales Date: Fri, 24 Oct 2025 13:33:41 +0100 Subject: [PATCH 11/12] Fix linter issues --- prepare_species/extract_species_data_redlist.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/prepare_species/extract_species_data_redlist.py b/prepare_species/extract_species_data_redlist.py index fc70241..41cfd89 100644 --- a/prepare_species/extract_species_data_redlist.py +++ b/prepare_species/extract_species_data_redlist.py @@ -194,7 +194,7 @@ def process_species( # version. Note that the "True" is because the redlistapi package casts this to a string with # the values of "True", "False", and "None". if infrarank == "True": - logger.debug("Dropping %s: infrarank not none: %s", id_no, infrarank, infrarank.__class__) + logger.debug("Dropping %s: infrarank not none: %s", id_no, infrarank) return report report.assessment_id = assessment_id From 5c247330773fd3f18e33e6095c1b00cc8c73e389 Mon Sep 17 00:00:00 2001 From: Michael Dales Date: Mon, 27 Oct 2025 15:08:04 +0000 Subject: [PATCH 12/12] Fix test issues --- prepare_species/common.py | 4 ++-- tests/test_species_filter.py | 26 ++++++++++++++++++++------ 2 files changed, 22 insertions(+), 8 deletions(-) diff --git a/prepare_species/common.py b/prepare_species/common.py index a543c60..b8bb284 100644 --- a/prepare_species/common.py +++ b/prepare_species/common.py @@ -121,8 +121,8 @@ def process_geometries( report.has_geometries = True geometry = None - for geometry in geometries_data: - grange = shapely.normalize(geometry) + for geom in geometries_data: + grange = shapely.normalize(geom) if grange.area == 0.0: continue diff --git a/tests/test_species_filter.py b/tests/test_species_filter.py index d7fb9fe..5d56910 100644 --- a/tests/test_species_filter.py +++ b/tests/test_species_filter.py @@ -65,33 +65,47 @@ def test_empty_geometry_list(): assert not report.keeps_geometries def test_simple_resident_species_geometry_filter_point(): - geoemetries_data = [ + geometries_data = [ (postgis.Geometry.from_ewkb("000000000140000000000000004010000000000000"),), ] + cleaned_geometries = [ + shapely.from_wkb(row_geometry[0].to_ewkb()) + for row_geometry in geometries_data if row_geometry[0] is not None + ] + print(cleaned_geometries) report = SpeciesReport(1, 2, "name") with pytest.raises(ValueError): - _ = process_geometries(geoemetries_data, report) + r = process_geometries(cleaned_geometries, report) + print(r) assert report.has_geometries assert not report.keeps_geometries def test_simple_resident_species_geometry_filter_polygon(): - geoemetries_data = [ + geometries_data = [ (postgis.Geometry.from_ewkb("0103000000010000000400000000000000000000000000000000000000000000000000F03F000000000000F03F9A9999999999B93F000000000000F03F00000000000000000000000000000000"),), # pylint: disable=C0301 ] + cleaned_geometries = [ + shapely.from_wkb(row_geometry[0].to_ewkb()) + for row_geometry in geometries_data if row_geometry[0] is not None + ] report = SpeciesReport(1, 2, "name") - res = process_geometries(geoemetries_data, report) + res = process_geometries(cleaned_geometries, report) assert res == shapely.Polygon([(0.0, 0.0), (0.1, 1.0), (1.0, 1.0), (0.0, 0.0)]) assert report.has_geometries assert report.keeps_geometries def test_simple_migratory_species_geometry_filter(): - geoemetries_data = [ + geometries_data = [ (postgis.Geometry.from_ewkb("0103000000010000000400000000000000000000000000000000000000000000000000F03F000000000000F03F9A9999999999B93F000000000000F03F00000000000000000000000000000000"),), # pylint: disable=C0301 (postgis.Geometry.from_ewkb("0103000000010000000400000000000000000000000000000000000000000000000000F03F000000000000F03F9A9999999999B93F000000000000F03F00000000000000000000000000000000"),), # pylint: disable=C0301 ] + cleaned_geometries = [ + shapely.from_wkb(row_geometry[0].to_ewkb()) + for row_geometry in geometries_data if row_geometry[0] is not None + ] report = SpeciesReport(1, 2, "name") - res = process_geometries(geoemetries_data, report) + res = process_geometries(cleaned_geometries, report) assert res == shapely.Polygon([(0.1, 1.0), (1.0, 1.0), (0.0, 0.0), (0.1, 1.0)]) assert report.has_geometries assert report.keeps_geometries