diff --git a/cartiflette/public/client.py b/cartiflette/public/client.py index 6bee524d..bb3b6e64 100644 --- a/cartiflette/public/client.py +++ b/cartiflette/public/client.py @@ -189,6 +189,8 @@ def get_vectorfile_ign( territory: str = "*", borders: str = "COMMUNE", crs: str = "*", + filter_by: str = "origin", + value: str = "raw", bucket=cartiflette.BUCKET, path_within_bucket=cartiflette.PATH_WITHIN_BUCKET, type_download: str = "https", @@ -237,8 +239,8 @@ def get_vectorfile_ign( "year": year, "borders": borders, "crs": crs, - "filter_by": None, - "value": None, + "filter_by": filter_by, + "value": value, "provider": "cartiflette", "dataset_family": dataset_family, "source": source, diff --git a/cartiflette/public/output.py b/cartiflette/public/output.py index 4bba3135..b44930b5 100644 --- a/cartiflette/public/output.py +++ b/cartiflette/public/output.py @@ -141,8 +141,6 @@ def download_file_single( ) with tempfile.TemporaryDirectory() as tdir: - print("=" * 50) - print(tdir) if type_download == "bucket": try: if not fs.exists(url): @@ -161,9 +159,6 @@ def download_file_single( else: local_path = f"{tdir}/{os.path.basename(url)}" - print("-" * 50) - print(url) - print(local_path) fs.download(url, local_path) else: diff --git a/cartiflette/s3/preprocess.py b/cartiflette/s3/preprocess.py index 6ba0aa6f..e0e3bfd5 100644 --- a/cartiflette/s3/preprocess.py +++ b/cartiflette/s3/preprocess.py @@ -1,20 +1,22 @@ # -*- coding: utf-8 -*- from datetime import date +from functools import partial import geopandas as gpd import io import logging import numpy as np import os import pandas as pd +from pebble import ThreadPool import s3fs import tempfile from typing import TypedDict -from cartiflette import BUCKET, PATH_WITHIN_BUCKET, FS +from cartiflette import BUCKET, PATH_WITHIN_BUCKET, FS, THREADS_DOWNLOAD from cartiflette.utils import magic_csv_reader, create_path_bucket -from cartiflette.public import get_vectorfile_ign +from cartiflette.public import get_vectorfile_ign, download_file_single logger = logging.getLogger(__name__) @@ -61,53 +63,60 @@ def store_cog_year( "PAYS", ] + provider = "Insee" + dataset_family = "COG" + dict_cog = {} - for level in levels: - pattern = ( - f"{bucket}/{path_within_bucket}/{year=}/**/" - f"provider=Insee/dataset_family=COG/source={level}/**/*.*" - ) - files = fs.glob(pattern) # , refresh=True) - # see issue : https://github.com/fsspec/s3fs/issues/504 - data = [] - for file in files: - with fs.open(file, "rb") as f: - dummy = io.BytesIO(f.read()) - df = magic_csv_reader(dummy) - data.append(df) - if data: - dict_cog[level] = pd.concat(data) - else: - dict_cog[level] = pd.DataFrame() - - for ext, method, kwargs in [ - ("parquet", "to_parquet", {}), - ("csv", "to_csv", {"encoding": "utf8"}), - ]: - config_dict = { - "bucket": bucket, - "path_within_bucket": path_within_bucket, - "year": year, - "borders": level, - "crs": None, - "filter_by": None, - "value": None, - "file_format": ext, - "provider": "cartiflette", - "dataset_family": "COG", - "source": level, - "territory": "france_entiere", - "filename": f"{level}.{ext}", - } - path = create_path_bucket(config=config_dict) - with fs.open(path, "wb") as f: - getattr(dict_cog[level], method)(f, **kwargs) + for source in levels: + try: + pattern = ( + f"{bucket}/{path_within_bucket}/" + f"{provider=}/{dataset_family=}/{source=}/{year=}/**/*.*" + ).replace("'", "") + files = fs.glob(pattern) # , refresh=True) + # see issue : https://github.com/fsspec/s3fs/issues/504 + data = [] + for file in files: + with fs.open(file, "rb") as f: + dummy = io.BytesIO(f.read()) + df = magic_csv_reader(dummy) + data.append(df) + if data: + dict_cog[source] = pd.concat(data) + dict_cog[source]["source"] = f"{provider}:{source}" + else: + dict_cog[source] = pd.DataFrame() + + for ext, method, kwargs in [ + ("parquet", "to_parquet", {}), + # ("csv", "to_csv", {"encoding": "utf8"}), + ]: + config_dict = { + "bucket": bucket, + "path_within_bucket": path_within_bucket, + "year": year, + "borders": source, + "crs": None, + "filter_by": "origin", + "value": "preprocessed", + "file_format": ext, + "provider": "cartiflette", + "dataset_family": "COG", + "source": source, + "territory": "france_entiere", + "filename": f"{source}.{ext}", + } + path = create_path_bucket(config=config_dict) + with fs.open(path, "wb") as f: + getattr(dict_cog[source], method)(f, **kwargs) + except Exception as e: + logger.error(e) def store_cog_ign( + borders: str = "COMMUNE", year: str = None, territory: str = "metropole", - borders: str = "COMMUNE", bucket: str = BUCKET, path_within_bucket: str = PATH_WITHIN_BUCKET, fs: s3fs.S3FileSystem = FS, @@ -142,76 +151,82 @@ def store_cog_ign( ------ ValueError - If a wildcard is used on an unallowed argument - - If the dataset is not found on MinIO + - If the source dataset is not found on MinIO Returns ------- None """ + logger.info(f"IGN AdminExpress {year} {borders=}") + try: + if not year: + year = date.today().year + elif year == "*": + raise ValueError(f"Cannot use a * wildcard on {year=}") + + dataset_family = "ADMINEXPRESS" + source = "EXPRESS-COG-TERRITOIRE" + provider = "IGN" + pattern = ( + f"{bucket}/{path_within_bucket}/" + f"{provider=}/{dataset_family=}/{source=}/{year=}/**/{territory=}" + f"/**/{borders}.shp" + ).replace("'", "") + files = fs.glob(pattern) # , refresh=True) + # see issue : https://github.com/fsspec/s3fs/issues/504 + if not files: + raise ValueError( + "No file retrieved with the set parameters, resulting to the " + f"following {pattern=}" + ) - if not year: - year = date.today().year - elif year == "*": - raise ValueError(f"Cannot use a * wildcard on {year=}") - - dataset_family = "ADMINEXPRESS" - source = "EXPRESS-COG-TERRITOIRE" - provider = "IGN" - pattern = ( - f"{bucket}/{path_within_bucket}/{year=}/**/" - f"{provider=}/{dataset_family=}/{source=}/{territory=}/**/" - f"{borders}.shp" - ).replace("'", "") - files = fs.glob(pattern) # , refresh=True) - # see issue : https://github.com/fsspec/s3fs/issues/504 - if not files: - raise ValueError( - "No file retrieved with the set parameters, resulting to the " - f"following {pattern=}" - ) - - data = [] - for file in files: - logger.info(f"retrieving {file=}") - with tempfile.TemporaryDirectory() as tempdir: - pattern = file.rsplit(".", maxsplit=1)[0] - all_files = fs.glob(pattern + ".*") # , refresh=True) - # see issue : https://github.com/fsspec/s3fs/issues/504 - for temp in all_files: - with open( - os.path.join(tempdir, os.path.basename(temp)), "wb" - ) as tf: - with fs.open(temp, "rb") as fsf: - tf.write(fsf.read()) - gdf = gpd.read_file(os.path.join(tempdir, os.path.basename(file))) - - gdf = gdf.to_crs(4326) - data.append(gdf) - gdf = gpd.pd.concat(data) - - if borders == "ARRONDISSEMENT_MUNICIPAL": - gdf["INSEE_DEP"] = gdf["INSEE_COM"].str[:2] - - gdf["source"] = f"{provider}:{source}" - - config_dict = { - "bucket": bucket, - "path_within_bucket": path_within_bucket, - "year": year, - "borders": borders, - "crs": gdf.crs.to_epsg(), - "filter_by": None, - "value": None, - "file_format": "GPKG", - "provider": "cartiflette", - "dataset_family": dataset_family, - "source": source, - "territory": territory if territory != "*" else "france_entiere", - "filename": f"{borders}.gpkg", - } - path = create_path_bucket(config=config_dict) - with fs.open(path, "wb") as f: - gdf.to_file(f, driver="GPKG", encoding="utf8") + data = [] + for file in files: + logger.info(f"retrieving {file=}") + with tempfile.TemporaryDirectory() as tempdir: + pattern = file.rsplit(".", maxsplit=1)[0] + all_files = fs.glob(pattern + ".*") # , refresh=True) + # see issue : https://github.com/fsspec/s3fs/issues/504 + for temp in all_files: + with open( + os.path.join(tempdir, os.path.basename(temp)), "wb" + ) as tf: + with fs.open(temp, "rb") as fsf: + tf.write(fsf.read()) + gdf = gpd.read_file( + os.path.join(tempdir, os.path.basename(file)) + ) + + gdf = gdf.to_crs(4326) + data.append(gdf) + gdf = gpd.pd.concat(data) + + if borders == "ARRONDISSEMENT_MUNICIPAL": + gdf["INSEE_DEP"] = gdf["INSEE_COM"].str[:2] + + gdf["source"] = f"{provider}:{source}" + gdf["source_geometries"] = f"{provider}:{source}" + + config_dict = { + "bucket": bucket, + "path_within_bucket": path_within_bucket, + "year": year, + "borders": borders, + "crs": gdf.crs.to_epsg(), + "filter_by": "origin", + "value": "preprocessed", + "file_format": "GPKG", + "provider": "cartiflette", + "dataset_family": dataset_family, + "source": source, + "territory": territory if territory != "*" else "france_entiere", + "filename": f"{borders}.gpkg", + } + path = create_path_bucket(config=config_dict) + with fs.open(path, "wb") as f: + gdf.to_file(f, driver="GPKG", encoding="utf8") + except Exception as e: + logger.error(e) def store_vectorfile_communes_arrondissement( @@ -244,33 +259,47 @@ def store_vectorfile_communes_arrondissement( if not year: year = date.today().year - provider = "IGN" - source = "EXPRESS-COG-TERRITOIRE" - - arrondissements = get_vectorfile_ign( - borders="ARRONDISSEMENT_MUNICIPAL", - year=year, - territory="metropole", - provider=provider, - source=source, - type_download="bucket", - crs="4326", - ) + kwargs = { + "year": year, + "crs": "4326", + "filter_by": "origin", + "value": "preprocessed", + "file_format": "GPKG", + "provider": "cartiflette", + "source": "EXPRESS-COG-TERRITOIRE", + "dataset_family": "ADMINEXPRESS", + "territory": "france_entiere", + "type_download": "bucket", + "fs": fs, + "bucket": bucket, + "path_within_bucket": path_within_bucket, + } - communes = get_vectorfile_ign( - borders="COMMUNE", - year=year, - territory="france_entiere", - provider=provider, - source=source, - type_download="bucket", - crs="4326", - ) + try: + arrondissements = download_file_single( + borders="ARRONDISSEMENT_MUNICIPAL", + filename="ARRONDISSEMENT_MUNICIPAL.gpkg", + **kwargs, + ) + + communes = download_file_single( + borders="COMMUNE", + filename="COMMUNE.gpkg", + **kwargs, + ) + except Exception as e: + logger.error(e) + + field = "NOM" + try: + communes[field] + except KeyError: + field = "NOM_COM" communes_sans_grandes_villes = communes.loc[ - ~communes["NOM"].isin(["Marseille", "Lyon", "Paris"]) + ~communes[field].isin(["Marseille", "Lyon", "Paris"]) ] communes_grandes_villes = communes.loc[ - communes["NOM"].isin(["Marseille", "Lyon", "Paris"]) + communes[field].isin(["Marseille", "Lyon", "Paris"]) ] arrondissement_extra_info = arrondissements.merge( @@ -284,33 +313,40 @@ def store_vectorfile_communes_arrondissement( [communes_sans_grandes_villes, arrondissement_extra_info] ) + try: + field = "INSEE_ARM" + gdf_enrichi[field] + except KeyError: + field = "INSEE_RATT" gdf_enrichi["INSEE_COG"] = np.where( - gdf_enrichi["INSEE_ARM"].isnull(), + gdf_enrichi[field].isnull(), gdf_enrichi["INSEE_COM"], - gdf_enrichi["INSEE_ARM"], + gdf_enrichi[field], ) - gdf_enrichi = gdf_enrichi.drop("INSEE_ARM", axis="columns") - - # TODO: store on s3fs - config_dict = { - "bucket": bucket, - "path_within_bucket": path_within_bucket, - "year": year, - "borders": "COMMUNE", - "crs": gdf_enrichi.crs.to_epsg(), - "filter_by": None, - "value": None, - "file_format": "GPKG", - "provider": "cartiflette", - "dataset_family": "COG", - "source": source, - "territory": "france_entiere", - "filename": "COMMUNE_ARRONDISSEMENTS_MUNICIPAUX.gpkg", - } - path = create_path_bucket(config=config_dict) - with fs.open(path, "wb") as f: - gdf_enrichi.to_file(f, driver="GPKG", encoding="utf8") + gdf_enrichi = gdf_enrichi.drop(field, axis="columns") + + try: + config_dict = { + "bucket": bucket, + "path_within_bucket": path_within_bucket, + "year": year, + "borders": "COMMUNE_ARRONDISSEMENTS_MUNICIPAUX", + "crs": gdf_enrichi.crs.to_epsg(), + "filter_by": "origin", + "value": "preprocessed", + "file_format": "GPKG", + "provider": "cartiflette", + "dataset_family": "ADMINEXPRESS", + "source": "COMMUNE_ARRONDISSEMENTS_MUNICIPAUX", + "territory": "france_entiere", + "filename": "COMMUNE_ARRONDISSEMENTS_MUNICIPAUX.gpkg", + } + path = create_path_bucket(config=config_dict) + with fs.open(path, "wb") as f: + gdf_enrichi.to_file(f, driver="GPKG", encoding="utf8") + except Exception as e: + logger.error(e) def store_living_area( @@ -369,101 +405,180 @@ def store_living_area( if not year: year = date.today().year - territory = "france_entiere" - provider = "Insee" - dataset_family = "BV" - source = bv_source - pattern = ( - f"{bucket}/{path_within_bucket}/{year=}/**/" - f"{provider=}/{dataset_family=}/{source=}/{territory=}/**/" - f"*.dbf" - ).replace("'", "") - files = fs.glob(pattern) # , refresh=True) - # see issue : https://github.com/fsspec/s3fs/issues/504 - if not files: - raise ValueError( - "No file retrieved with the set parameters, resulting to the " - f"following {pattern=}" + try: + territory = "france_entiere" + provider = "Insee" + dataset_family = "BV" + source = bv_source + pattern = ( + f"{bucket}/{path_within_bucket}/" + f"{provider=}/{dataset_family=}/{source=}/{year=}/**/{territory=}" + f"/**/*.dbf" + ).replace("'", "") + files = fs.glob(pattern) # , refresh=True) + # see issue : https://github.com/fsspec/s3fs/issues/504 + if not files: + raise ValueError( + "No file retrieved with the set parameters, resulting to the " + f"following {pattern=}" + ) + data = [] + for file in files: + with tempfile.TemporaryDirectory() as tempdir: + tmp_dbf = os.path.join(tempdir, os.path.basename(file)) + with open(tmp_dbf, "wb") as tf: + with fs.open(file, "rb") as fsf: + tf.write(fsf.read()) + + df = gpd.read_file(tmp_dbf, encoding="utf8") + df = df.drop("geometry", axis=1) + data.append(df) + + bv = pd.concat(data) + + kwargs = { + "year": year, + "crs": "4326", + "filter_by": "origin", + "value": "preprocessed", + "file_format": "GPKG", + "provider": "cartiflette", + "source": "EXPRESS-COG-TERRITOIRE", + "dataset_family": "ADMINEXPRESS", + "territory": "france_entiere", + "type_download": "bucket", + "fs": fs, + "bucket": bucket, + "path_within_bucket": path_within_bucket, + } + communes = download_file_single( + borders="COMMUNE", + filename="COMMUNE.gpkg", + **kwargs, ) - data = [] - for file in files: - with tempfile.TemporaryDirectory() as tempdir: - tmp_dbf = os.path.join(tempdir, os.path.basename(file)) - with open(tmp_dbf, "wb") as tf: - with fs.open(file, "rb") as fsf: - tf.write(fsf.read()) - - df = gpd.read_file(tmp_dbf, encoding="utf8") - df = df.drop("geometry", axis=1) - data.append(df) - - bv = pd.concat(data) - - ign_source = "EXPRESS-COG-TERRITOIRE" - communes = store_cog_ign( - borders="COMMUNE", - year=year, - territory="*", - provider="IGN", - source=ign_source, - ) - bv = communes.merge( - bv, left_on="INSEE_COM", right_on="codgeo", how="right" - ) - if bv_source == "FondsDeCarte_BV_2022": - rename = ["bv2022", "libbv2022"] - elif bv_source == "FondsDeCarte_BV_2012": - rename = ["bv2012", "libbv2012"] - bv = bv.rename(dict(zip(rename, ["bv", "libbv"])), axis=1) + bv = communes.merge( + bv, left_on="INSEE_COM", right_on="codgeo", how="right" + ) + bv["source"] = f"{provider}:{source}" + + if bv_source == "FondsDeCarte_BV_2022": + rename = ["bv2022", "libbv2022"] + elif bv_source == "FondsDeCarte_BV_2012": + rename = ["bv2012", "libbv2012"] + bv = bv.rename(dict(zip(rename, ["bv", "libbv"])), axis=1) + + config_dict = { + "bucket": bucket, + "path_within_bucket": path_within_bucket, + "year": year, + "borders": "COMMUNE", + "crs": bv.crs.to_epsg(), + "filter_by": "origin", + "value": "preprocessed", + "file_format": "GPKG", + "provider": "cartiflette", + "dataset_family": f"bassins-vie-{bv_source.split('_')[-1]}", + "source": "BV", + "filename": "bassins_vie.gpkg", + } + path = create_path_bucket(config=config_dict) + with fs.open(path, "wb") as f: + bv.to_file(f, driver="GPKG") + + by = ["bv", "libbv", "dep", "reg"] + + bv = bv.dissolve( + by=by, aggfunc={"POPULATION": "sum"}, as_index=False, dropna=False + ) - config_dict = { + config_dict = { + "bucket": bucket, + "path_within_bucket": path_within_bucket, + "year": year, + "borders": "BASSIN-VIE", + "crs": bv.crs.to_epsg(), + "filter_by": "origin", + "value": "preprocessed", + "file_format": "GPKG", + "provider": "cartiflette", + "dataset_family": f"bassins-vie-{bv_source.split('_')[-1]}", + "source": "BV", + "territory": "france_entiere", + "filename": "bassins_vie.gpkg", + } + path = create_path_bucket(config=config_dict) + with fs.open(path, "wb") as f: + bv.to_file(f, driver="GPKG") + + return + + except Exception as e: + logger.error(e) + + +def preprocess_one_year( + year: int, + bucket: str = BUCKET, + path_within_bucket: str = PATH_WITHIN_BUCKET, + fs: s3fs.S3FileSystem = FS, +): + # TODO : docstring, preprocessing par année à cause des dépendances + # entre jeux de données + kwargs = { "bucket": bucket, "path_within_bucket": path_within_bucket, - "year": year, - "borders": "COMMUNE", - "crs": bv.crs.to_epsg(), - "filter_by": None, - "value": None, - "file_format": "GPKG", - "provider": "cartiflette", - "dataset_family": "bassins-vie", - "source": "BV", - "filename": "bassins_vie.gpkg", + "fs": fs, } - path = create_path_bucket(config=config_dict) - with fs.open(path, "wb") as f: - bv.to_file(f, driver="GPKG") - - by = ["bv", "libbv", "dep", "reg"] - - bv = bv.dissolve( - by=by, aggfunc={"POPULATION": "sum"}, as_index=False, dropna=False + logger.info(f"Insee COG {year=}") + store_cog_year(year=year, **kwargs) + + # Concaténation d'AdminExpress Communal (et formatage identique des + # arrondissements municipaux pour reprojection / ajout source en vue + # de l'étape de préparation du mix communes/arrondissements) + + func = partial(store_cog_ign, year=year, territory="*", **kwargs) + if THREADS_DOWNLOAD > 1: + with ThreadPool(2) as pool: + iterator = pool.map( + func, ["COMMUNE", "ARRONDISSEMENT_MUNICIPAL"] + ).result() + while True: + try: + next(iterator) + except StopIteration: + break + except Exception as e: + logger.error(e) + else: + for source in [ + "COMMUNE", + "ARRONDISSEMENT_MUNICIPAL", + ]: + store_cog_ign( + year=year, + territory="*", + borders=source, + **kwargs, + ) + + # Préparation des AdminExpress enrichis avec arrondissements municipaux + logger.info( + f"IGN AdminExpress mixup {year} , " + "sources=['COMMUNE', 'ARRONDISSEMENT_MUNICIPAL']" ) + store_vectorfile_communes_arrondissement(year=year, **kwargs) - config_dict = { - "bucket": bucket, - "path_within_bucket": path_within_bucket, - "year": year, - "borders": "BASSIN-VIE", - "crs": bv.crs.to_epsg(), - "filter_by": None, - "value": None, - "file_format": "GPKG", - "provider": "cartiflette", - "dataset_family": f"bassins-vie-{bv_source.split('_')[-1]}", - "source": "BV", - "territory": "france_entiere", - "filename": "bassins_vie.gpkg", - } - path = create_path_bucket(config=config_dict) - with fs.open(path, "wb") as f: - bv.to_file(f, driver="GPKG") + if year >= 2022: + logger.info(f"INSEE living area v2022 in {year}") + store_living_area(year, "FondsDeCarte_BV_2022", **kwargs) - return bv + if 2012 <= year <= 2022: + logger.info(f"INSEE living area v2012 in {year}") + store_living_area(year, "FondsDeCarte_BV_2012", **kwargs) -def simili_pipeline( +def preprocess_pipeline( bucket: str = BUCKET, path_within_bucket: str = PATH_WITHIN_BUCKET, fs: s3fs.S3FileSystem = FS, @@ -474,34 +589,26 @@ def simili_pipeline( "fs": fs, } - logger.error("ATTENTION, YEARS EST FORCE A 2022 POUR PREPROCESSING") - # years = list(range(2015, date.today().year + 1))[-1::-1] - years = [2022] + years = list(range(2015, date.today().year + 1))[-1::-1] logger.info("Preprocess raw sources") - for year in years: - # Préprocessing du COG INSEE - logger.info(f"COG INSEE {year}") - store_cog_year(year=year, **kwargs) - - # Concaténation d'AdminExpress Communal - store_cog_ign( - year=year, - territory="*", - borders="COMMUNE", - **kwargs, - ) - - # Préparation des AdminExpress enrichis avec arrondissements municipaux - store_vectorfile_communes_arrondissement(year=year, **kwargs) - if year >= 2022: - store_living_area(year, "FondsDeCarte_BV_2022", **kwargs) + if THREADS_DOWNLOAD > 1: + with ThreadPool(THREADS_DOWNLOAD) as pool: + iterator = pool.map(preprocess_one_year, years).result() + while True: + try: + next(iterator) + except StopIteration: + break + except Exception as e: + logger.error(e) - if 2012 <= year <= 2022: - store_living_area(year, "FondsDeCarte_BV_2012", **kwargs) + else: + for year in years: + preprocess_one_year(year, **kwargs) if __name__ == "__main__": logging.basicConfig(level=logging.INFO) - simili_pipeline() + preprocess_pipeline()