diff --git a/CHANGES.txt b/CHANGES.txt index be150fa..5ffbdeb 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,6 +1,12 @@ Changes ======= +0.10.0 (2024-02-19) +------------------ +- store known primary keys in repository (data/primary_keys.json) +- add new function bcdata.get_primary_keys(), use remote url as default primary key lookup for bc2pg (#163) +- --refresh option cleanup, now available only via CLI + 0.9.2 (2024-02-18) ------------------ - bc2pg geometry type inspection - look for geometries at bottom of dataset when none present at top (#158) diff --git a/bcdata/__init__.py b/bcdata/__init__.py index 2ef5ff0..5fa3bd9 100644 --- a/bcdata/__init__.py +++ b/bcdata/__init__.py @@ -1,4 +1,4 @@ -from .bc2pg import bc2pg +from .bc2pg import bc2pg, get_primary_keys from .bcdc import get_table_definition, get_table_name from .wcs import get_dem from .wfs import ( @@ -10,4 +10,8 @@ validate_name, ) +PRIMARY_KEY_DB_URL = ( + "https://raw.githubusercontent.com/smnorris/bcdata/main/data/primary_keys.json" +) + __version__ = "0.10.0dev0" diff --git a/bcdata/bc2pg.py b/bcdata/bc2pg.py index 0f335ef..bacb34b 100644 --- a/bcdata/bc2pg.py +++ b/bcdata/bc2pg.py @@ -1,3 +1,4 @@ +import json import logging import os @@ -5,6 +6,7 @@ import numpy import stamina from geoalchemy2 import Geometry +import requests from shapely.geometry.linestring import LineString from shapely.geometry.multilinestring import MultiLineString from shapely.geometry.multipoint import MultiPoint @@ -32,6 +34,19 @@ ] +def get_primary_keys(): + """download primary key data file""" + response = requests.get(bcdata.PRIMARY_KEY_DB_URL) + if response.status_code == 200: + primary_keys = response.json() + else: + log.warning( + f"Failed to download primary key database at {bcdata.PRIMARY_KEY_DB_URL}" + ) + primary_keys = {} + return primary_keys + + def bc2pg( # noqa: C901 dataset, db_url, @@ -45,11 +60,10 @@ def bc2pg( # noqa: C901 timestamp=True, schema_only=False, append=False, - refresh=False, ): """Request table definition from bcdc and replicate in postgres""" - if append and refresh: - raise ValueError("Options append and refresh are not compatible") + if schema_only and append: + raise ValueError("Options schema_only and append are not compatible") dataset = bcdata.validate_name(dataset) schema_name, table_name = dataset.lower().split(".") @@ -75,18 +89,14 @@ def bc2pg( # noqa: C901 df = None # just for tracking if first download is done by geometry type check - # if appending or refreshing, get column names from db, make sure table exists - if append or refresh: + # if appending, get column names from db, make sure table exists + if append: if schema_name + "." + table_name not in db.tables: raise ValueError(f"{schema_name}.{table_name} does not exist") column_names = db.get_columns(schema_name, table_name) - # clear existing data if directed by refresh option - if refresh: - db.truncate(schema_name, table_name) - - # if not appending/refreshing, define and create table - if not append or refresh: + # if not appending, define and create table + if not append: # get info about the table from catalogue table_definition = bcdata.get_table_definition(dataset) @@ -132,6 +142,12 @@ def bc2pg( # noqa: C901 if geometry_type not in SUPPORTED_TYPES: raise ValueError("Geometry type {geometry_type} is not supported") + # if primary key is not supplied, use default (if present in list) + primary_keys = get_primary_keys() + if not primary_key and dataset.lower() in primary_keys: + primary_key = primary_keys[dataset.lower()] + + # fail if specified primary key is not in the table if primary_key and primary_key.upper() not in [ c["column_name"].upper() for c in table_definition["schema"] ]: diff --git a/bcdata/cli.py b/bcdata/cli.py index 71ac569..7215e9b 100644 --- a/bcdata/cli.py +++ b/bcdata/cli.py @@ -8,6 +8,7 @@ from cligj import compact_opt, indent_opt, quiet_opt, verbose_opt import bcdata +from bcdata.database import Database LOG_FORMAT = "%(asctime)s:%(levelname)s:%(name)s: %(message)s" @@ -385,6 +386,18 @@ def bc2pg( timestamp = False else: timestamp = True + if refresh and append: + raise ValueError("Options append and refresh are not compatible") + if refresh and (schema == "bcdata"): + raise ValueError( + "Refreshing tables in bcdata schema is not supported, use another schema" + ) + elif refresh and schema: + schema_target = schema + elif refresh and not schema: + schema_target, t = bcdata.validate_name(dataset).lower().split(".") + if refresh: + schema = "bcdata" out_table = bcdata.bc2pg( dataset, db_url, @@ -398,6 +411,13 @@ def bc2pg( timestamp=timestamp, schema_only=schema_only, append=append, - refresh=refresh, ) + + # if refreshing, flush from temp bcdata schema to target schema + if refresh: + db = Database(db_url) + s, table = out_table.split(".") + db.refresh(schema_target, table) + out_table = schema_target + "." + table + log.info("Load of {} to {} in {} complete".format(dataset, out_table, db_url)) diff --git a/bcdata/database.py b/bcdata/database.py index 72939df..3a3e4b7 100644 --- a/bcdata/database.py +++ b/bcdata/database.py @@ -92,14 +92,34 @@ def drop_table(self, schema, table): ) self.execute(dbq) - def truncate(self, schema, table): + def refresh(self, schema, table): + # move data from temp table to target table if schema + "." + table in self.tables: - log.warning(f"Truncating table {schema}.{table}") + log.warning( + f"Truncating table {schema}.{table} and refreshing from bcdata.{table}" + ) dbq = sql.SQL("TRUNCATE {schema}.{table}").format( schema=sql.Identifier(schema), table=sql.Identifier(table), ) self.execute(dbq) + columns = list( + set(self.get_columns("bcdata", table)).intersection( + self.get_columns(schema, table) + ) + ) + identifiers = [sql.Identifier(c) for c in columns] + dbq = sql.SQL( + """INSERT INTO {schema}.{table} + ({columns}) + SELECT {columns} FROM bcdata.{table}""" + ).format( + schema=sql.Identifier(schema), + table=sql.Identifier(table), + columns=sql.SQL(",").join(identifiers), + ) + self.execute(dbq) + self.drop_table("bcdata", table) def define_table( self, diff --git a/tests/test_bc2pg.py b/tests/test_bc2pg.py index 8a3874d..1950e4e 100644 --- a/tests/test_bc2pg.py +++ b/tests/test_bc2pg.py @@ -103,6 +103,29 @@ def test_bc2pg_primary_key(): DB_CONNECTION.execute("drop table " + ASSESSMENTS_TABLE) +def test_bc2pg_get_primary_keys(): + primary_keys = bcdata.get_primary_keys() + assert primary_keys[ASSESSMENTS_TABLE] == "stream_crossing_id" + + +def test_bc2pg_primary_key_default(): + bcdata.bc2pg(ASSESSMENTS_TABLE, DB_URL, count=100) + assert ASSESSMENTS_TABLE in DB_CONNECTION.tables + r = DB_CONNECTION.query( + """ + SELECT a.attname FROM pg_index i + JOIN pg_class c ON c.oid = i.indrelid + JOIN pg_attribute a ON a.attrelid = c.oid AND a.attnum = any(i.indkey) + JOIN pg_namespace n ON n.oid = c.relnamespace + WHERE relname = 'pscis_assessment_svw' + AND nspname = 'whse_fish' + AND indisprimary + """ + ) + assert r[0][0] == "stream_crossing_id" + DB_CONNECTION.execute("drop table " + ASSESSMENTS_TABLE) + + def test_bc2pg_filter(): bcdata.bc2pg( AIRPORTS_TABLE, @@ -164,21 +187,3 @@ def test_bc2pg_append_to_other(): r = DB_CONNECTION.query("select * from whse_imagery_and_base_maps.arpt") assert len(r) == 2 DB_CONNECTION.execute("drop table whse_imagery_and_base_maps.arpt") - - -def test_bc2pg_refresh(): - bcdata.bc2pg(AIRPORTS_TABLE, DB_URL) - bcdata.bc2pg( - AIRPORTS_TABLE, - DB_URL, - query="AIRPORT_NAME='Terrace (Northwest Regional) Airport'", - ) - bcdata.bc2pg( - AIRPORTS_TABLE, - DB_URL, - query="AIRPORT_NAME='Victoria International Airport'", - refresh=True, - ) - r = DB_CONNECTION.query("select * from whse_imagery_and_base_maps.gsr_airports_svw") - assert len(r) == 1 - DB_CONNECTION.execute("drop table whse_imagery_and_base_maps.gsr_airports_svw") diff --git a/tests/test_cli.py b/tests/test_cli.py index cd31dc1..f6793af 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -79,3 +79,36 @@ def test_bc2pg(): assert result.exit_code == 0 assert AIRPORTS_TABLE.lower() in DB_CONNECTION.tables DB_CONNECTION.execute("drop table " + AIRPORTS_TABLE.lower()) + + +def test_bc2pg_refresh(): + runner = CliRunner() + r1 = runner.invoke( + cli, + [ + "bc2pg", + AIRPORTS_TABLE, + "--db_url", + DB_URL, + "--query", + "AIRPORT_NAME='Terrace (Northwest Regional) Airport'", + ], + ) + r2 = runner.invoke( + cli, + [ + "bc2pg", + AIRPORTS_TABLE, + "--db_url", + DB_URL, + "--refresh", + "--query", + "AIRPORT_NAME='Victoria International Airport'", + ], + ) + q = DB_CONNECTION.query("select * from whse_imagery_and_base_maps.gsr_airports_svw") + assert r1.exit_code == 0 + assert r2.exit_code == 0 + assert AIRPORTS_TABLE.lower() in DB_CONNECTION.tables + assert len(q) == 1 + DB_CONNECTION.execute("drop table " + AIRPORTS_TABLE.lower())