diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 2751131..db7294b 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -36,7 +36,7 @@ jobs: python -m pip install -r requirements.txt - name: Lint with pylint run: | - python3 -m pylint utils prepare_layers prepare_species + python3 -m pylint utils prepare_layers prepare_species threats - name: Tests run: | python3 -m pytest ./tests diff --git a/prepare_species/extract_species_data_psql.py b/prepare_species/extract_species_data_psql.py index 61abe34..8c86668 100644 --- a/prepare_species/extract_species_data_psql.py +++ b/prepare_species/extract_species_data_psql.py @@ -1,5 +1,6 @@ import argparse import importlib +import json import logging import math import os @@ -38,10 +39,24 @@ "scientific_name", "family_name", "class_name", + "threats", "category", + "category_weight", "geometry" ] +# From Muir et al: For each species, a global STAR threat abatement (START) score +# is defined. This varies from zero for species of Least Concern to 100 +# for Near Threatened, 200 for Vulnerable, 300 for Endangered and +# 400 for Critically Endangered species (using established weighting +# ratios7,8) +CATEGORY_WEIGHTS = { + 'NT': 100, + 'VU': 200, + 'EN': 300, + 'CR': 400, +} + MAIN_STATEMENT = """ SELECT assessments.sis_taxon_id as id_no, @@ -84,10 +99,12 @@ THREATS_STATEMENT = """ SELECT - supplementary_fields->>'scope' AS scope, - supplementary_fields->>'severity' AS severity + threat_lookup.code, + assessment_threats.supplementary_fields->>'scope' AS scope, + assessment_threats.supplementary_fields->>'severity' AS severity FROM assessment_threats + LEFT JOIN threat_lookup ON assessment_threats.threat_id = threat_lookup.id WHERE assessment_id = %s AND (supplementary_fields->>'timing' is NULL OR supplementary_fields->>'timing' <> 'Past, Unlikely to Return') @@ -225,7 +242,7 @@ def process_systems( ] DEFAULT_SEVERITY = "slow, significant declines" -# Taken from Muir et al 2021, indexed by SCOPE and then SEVERITY +# Taken from Muir et al 2021 Supplementary Table 2, indexed by SCOPE and then SEVERITY THREAT_WEIGHTING_TABLE = [ [63, 24, 10, 1, 0, 10], [52, 18, 9, 0, 0, 9], @@ -236,8 +253,8 @@ def process_threats( threat_data: List, report: SpeciesReport, ) -> bool: - total = 0 - for scope, severity in threat_data: + cleaned_threats = [] + for code, scope, severity in threat_data: if scope is None or scope.lower() == "unknown": scope = DEFAULT_SCOPE if severity is None or severity.lower() == "unknown": @@ -245,9 +262,10 @@ def process_threats( scope_index = SCOPES.index(scope.lower()) severity_index = SEVERITIES.index(severity.lower()) score = THREAT_WEIGHTING_TABLE[scope_index][severity_index] - total += score - report.has_threats = total != 0 - return total != 0 + if score > 0: + cleaned_threats.append((code, score)) + report.has_threats = len(cleaned_threats) > 0 + return cleaned_threats def process_habitats( habitats_data: List[List[str]], @@ -329,7 +347,6 @@ def process_row( presence += (4,) report.possibly_extinct = True # pylint: disable=W0201 - cursor.execute(SYSTEMS_STATEMENT, (assessment_id,)) systems_data = cursor.fetchall() try: @@ -340,8 +357,8 @@ def process_row( cursor.execute(THREATS_STATEMENT, (assessment_id,)) raw_threats = cursor.fetchall() - threatened = process_threats(raw_threats, report) - if not threatened: + threats = process_threats(raw_threats, report) + if len(threats) == 0: return report cursor.execute(HABITATS_STATEMENT, (assessment_id,)) @@ -370,7 +387,9 @@ def process_row( scientific_name, family_name, class_name, + json.dumps(threats), category, + CATEGORY_WEIGHTS[category], geometry ]], columns=COLUMNS, diff --git a/tests/test_species_filter.py b/tests/test_species_filter.py index d2c39cc..b00a538 100644 --- a/tests/test_species_filter.py +++ b/tests/test_species_filter.py @@ -105,7 +105,7 @@ def test_empty_threat_list(): def test_no_serious_threats(): threats_data = [ - ("Minority (<50%)", "No decline"), + ("3.5", "Minority (<50%)", "No decline"), ] report = SpeciesReport(1, 2, "name") res = process_threats(threats_data, report) @@ -114,19 +114,19 @@ def test_no_serious_threats(): def test_serious_threats(): threats_data = [ - ("Whole (>90%)", "Very rapid declines"), + ("3.4", "Whole (>90%)", "Very rapid declines"), ] report = SpeciesReport(1, 2, "name") res = process_threats(threats_data, report) - assert res + assert res == [("3.4", 63)] assert report.has_threats def test_mixed_threats(): threats_data = [ - ("Whole (>90%)", "Very rapid declines"), - ("Minority (<50%)", "No decline"), + ("3.4", "Whole (>90%)", "Very rapid declines"), + ("3.5", "Minority (<50%)", "No decline"), ] report = SpeciesReport(1, 2, "name") res = process_threats(threats_data, report) - assert res + assert res == [("3.4", 63)] assert report.has_threats diff --git a/threats/threat_processing.py b/threats/threat_processing.py new file mode 100644 index 0000000..370fcac --- /dev/null +++ b/threats/threat_processing.py @@ -0,0 +1,83 @@ +import argparse +import json +import os +import sys + +import geopandas as gpd +from pyogrio.errors import DataSourceError +from yirgacheffe.layers import RasterLayer + +def threat_processing_per_species( + species_data_path: str, + aoh_path: str, + output_directory_path: str, +) -> None: + try: + data = gpd.read_file(species_data_path) + except DataSourceError: + sys.exit(f"Failed to read {species_data_path}") + + with RasterLayer.layer_from_file(aoh_path) as aoh: + + os.makedirs(output_directory_path, exist_ok=True) + + taxon_id = data.id_no[0] + category_weight = int(data.category_weight[0]) + threat_data = json.loads(data.threats[0]) + + try: + aoh_base, _ = os.path.splitext(aoh_path) + aoh_data_path = aoh_base + ".json" + with open(aoh_data_path, "r", encoding="UTF-8") as f: + aoh_data = json.load(f) + aoh_total = aoh_data["aoh_total"] + except (FileNotFoundError, KeyError): + aoh_total = aoh.sum() + + proportional_aoh_per_pixel = aoh / aoh_total + weighted_species = proportional_aoh_per_pixel * category_weight + + total_threat_weight = sum(x[1] for x in threat_data) + for threat_id, weight in threat_data: + proportional_threat_weight = weight / total_threat_weight + per_threat_per_species_score = weighted_species * proportional_threat_weight + + threat_dir_path = os.path.join(output_directory_path, str(threat_id)) + os.makedirs(threat_dir_path, exist_ok=True) + output_path = os.path.join(threat_dir_path, f"{taxon_id}.tif") + with RasterLayer.empty_raster_layer_like(aoh, filename=output_path) as result: + per_threat_per_species_score.save(result) + +def main() -> None: + parser = argparse.ArgumentParser(description="Calculate per species threat layers") + parser.add_argument( + '--speciesdata', + type=str, + help="Single species/seasonality geojson.", + required=True, + dest="species_data_path" + ) + parser.add_argument( + '--aoh', + type=str, + help="AoH raster of speices.", + required=True, + dest="aoh_path" + ) + parser.add_argument( + '--output', + type=str, + help='Directory where per species/threat layers are stored', + required=True, + dest='output_directory_path', + ) + args = parser.parse_args() + + threat_processing_per_species( + args.species_data_path, + args.aoh_path, + args.output_directory_path, + ) + +if __name__ == "__main__": + main() diff --git a/threats/threat_summation.py b/threats/threat_summation.py new file mode 100644 index 0000000..54bf6a5 --- /dev/null +++ b/threats/threat_summation.py @@ -0,0 +1,223 @@ +import argparse +import os +import sys +import tempfile +import time +from multiprocessing import Manager, Process, Queue, cpu_count +from pathlib import Path +from typing import List + +from yirgacheffe.layers import RasterLayer # type: ignore +from osgeo import gdal + +gdal.SetCacheMax(1024 * 1024 * 32) + +def worker( + filename: str, + result_dir: str, + input_queue: Queue, +) -> None: + output_tif = os.path.join(result_dir, filename) + + merged_result = None + + while True: + path = input_queue.get() + if path is None: + break + + with RasterLayer.layer_from_file(path) as partial_raster: + if merged_result is None: + merged_result = RasterLayer.empty_raster_layer_like(partial_raster) + cleaned_raster = partial_raster.nan_to_num() + cleaned_raster.save(merged_result) + else: + calc = merged_result + partial_raster.nan_to_num() + temp = RasterLayer.empty_raster_layer_like(calc) + calc.save(temp) + merged_result = temp + + if merged_result: + final = RasterLayer.empty_raster_layer_like(merged_result, filename=output_tif) + merged_result.save(final) + +def raster_sum( + images_list: List[Path], + output_filename: str, + processes_count: int +) -> None: + result_dir, filename = os.path.split(output_filename) + os.makedirs(result_dir, exist_ok=True) + + with tempfile.TemporaryDirectory() as tempdir: + with Manager() as manager: + source_queue = manager.Queue() + + workers = [Process(target=worker, args=( + f"{index}.tif", + tempdir, + source_queue + )) for index in range(processes_count)] + for worker_process in workers: + worker_process.start() + + for file in images_list: + source_queue.put(file) + for _ in range(len(workers)): + source_queue.put(None) + + processes = workers + while processes: + candidates = [x for x in processes if not x.is_alive()] + for candidate in candidates: + candidate.join() + if candidate.exitcode: + for victim in processes: + victim.kill() + sys.exit(candidate.exitcode) + processes.remove(candidate) + time.sleep(1) + + # here we should have now a set of images in tempdir to merge + single_worker = Process(target=worker, args=( + filename, + result_dir, + source_queue + )) + single_worker.start() + nextfiles = list(Path(tempdir).glob("*.tif")) + for file in nextfiles: + source_queue.put(file) + source_queue.put(None) + + processes = [single_worker] + while processes: + candidates = [x for x in processes if not x.is_alive()] + for candidate in candidates: + candidate.join() + if candidate.exitcode: + for victim in processes: + victim.kill() + sys.exit(candidate.exitcode) + processes.remove(candidate) + time.sleep(1) + +def reduce_to_next_level( + rasters_directory: str, + output_directory: str, + processes_count: int, +) -> None: + + files = list(Path(rasters_directory).glob("**/*.tif")) + print(f"total items: {len(files)}") + if not files: + sys.exit(f"No files in {rasters_directory}, aborting") + + buckets = {} + for filename in files: + code, _ = os.path.splitext(filename.name) + next_level_threat_id = ".".join(code.split('.')[:-1]) + if not next_level_threat_id: + next_level_threat_id = "top" + try: + buckets[next_level_threat_id].append(filename) + except KeyError: + buckets[next_level_threat_id] = [filename] + + print(f"Found {len(buckets)} threats at current level:") + for code, files in buckets.items(): + target_output = os.path.join(output_directory, f"{code}.tif") + print(f"processing {code}: {len(files)} items") + raster_sum(files, target_output, processes_count) + +def reduce_from_species( + rasters_directory: str, + output_directory: str, + processes_count: int, +) -> None: + + files = list(Path(rasters_directory).glob("**/*.tif")) + print(f"total items: {len(files)}") + if not files: + sys.exit(f"No files in {rasters_directory}, aborting") + + buckets = {} + for filename in files: + threat_code = filename.parts[-2] + levels = threat_code.split('.') + assert len(levels) > 1 # in practice all species threats are level 2 or level 3 + + match len(levels): + case 2 | 3: + code = ".".join(levels[:2]) + case _: + assert False + try: + buckets[code].append(filename) + except KeyError: + buckets[code] = [filename] + + print(f"Found {len(buckets)} threats at current level:") + for code, files in buckets.items(): + target_output = os.path.join(output_directory, f"{code}.tif") + print(f"processing {code}: {len(files)} items") + raster_sum(files, target_output, processes_count) + + +def threat_summation( + rasters_directory: str, + output_directory: str, + processes_count: int, +) -> None: + os.makedirs(output_directory, exist_ok=True) + + # All these files are at level3 to start with, so first make level2 + print("processing level 2") + level2_target = os.path.join(output_directory, "level2") + reduce_from_species(rasters_directory, level2_target, processes_count) + + # Now reduce level2 to level1 + print("processing level 1") + level1_target = os.path.join(output_directory, "level1") + reduce_to_next_level(level2_target, level1_target, processes_count) + + # Now build a final top level STAR + print("processing level 0") + final_target = os.path.join(output_directory, "level0") + reduce_to_next_level(level1_target, final_target, processes_count) + + +def main() -> None: + parser = argparse.ArgumentParser(description="Generates the combined, and level 1 and level 2 threat rasters.") + parser.add_argument( + "--threat_rasters", + type=str, + required=True, + dest="rasters_directory", + help="GeoTIFF file containing level three per species threats" + ) + parser.add_argument( + "--output", + type=str, + required=True, + dest="output_directory", + help="Destination directory file for results." + ) + parser.add_argument( + "-j", + type=int, + required=False, + default=round(cpu_count() / 2), + dest="processes_count", + help="Number of concurrent threads to use." + ) + args = parser.parse_args() + + threat_summation( + args.rasters_directory, + args.output_directory, + args.processes_count + ) + +if __name__ == "__main__": + main() diff --git a/utils/threats_generator.py b/utils/threats_generator.py new file mode 100644 index 0000000..a896861 --- /dev/null +++ b/utils/threats_generator.py @@ -0,0 +1,66 @@ +#!/usr/bin/env python3 + +import argparse +import os + +import pandas as pd + +def threats_generator( + input_dir: str, + data_dir: str, + output_csv_path: str +): + taxas = os.listdir(input_dir) + + res = [] + for taxa in taxas: + for scenario in ['current',]: + source = 'historic' if scenario == 'pnv' else 'current' + taxa_path = os.path.join(input_dir, taxa, source) + species_infos = os.listdir(taxa_path) + for species_info_path in species_infos: + taxon_id, _ = os.path.splitext(species_info_path) + aoh_path = os.path.join(data_dir, "aohs", source, taxa, f"{taxon_id}_all.tif") + if os.path.exists(aoh_path): + res.append([ + os.path.join(data_dir, "species-info", taxa, source, species_info_path), + aoh_path, + os.path.join(data_dir, "threat_rasters", taxa) + ]) + + df = pd.DataFrame(res, columns=[ + '--speciesdata', + '--aoh', + '--output' + ]) + df.to_csv(output_csv_path, index=False) + +def main() -> None: + parser = argparse.ArgumentParser(description="threat tasts generator.") + parser.add_argument( + '--input', + type=str, + help="directory with taxa folders of species info", + required=True, + dest="input_dir" + ) + parser.add_argument( + '--datadir', + type=str, + help="directory for results", + required=True, + dest="data_dir", + ) + parser.add_argument( + '--output', + type=str, + help="name of output file for csv", + required=True, + dest="output" + ) + args = parser.parse_args() + + threats_generator(args.input_dir, args.data_dir, args.output) + +if __name__ == "__main__": + main()