Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue163 #164

Merged
merged 4 commits into from
Feb 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
Changes
=======

0.10.0 (2024-02-19)
------------------
- store known primary keys in repository (data/primary_keys.json)
- add new function bcdata.get_primary_keys(), use remote url as default primary key lookup for bc2pg (#163)
- --refresh option cleanup, now available only via CLI

0.9.2 (2024-02-18)
------------------
- bc2pg geometry type inspection - look for geometries at bottom of dataset when none present at top (#158)
Expand Down
6 changes: 5 additions & 1 deletion bcdata/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .bc2pg import bc2pg
from .bc2pg import bc2pg, get_primary_keys
from .bcdc import get_table_definition, get_table_name
from .wcs import get_dem
from .wfs import (
Expand All @@ -10,4 +10,8 @@
validate_name,
)

PRIMARY_KEY_DB_URL = (
"https://raw.githubusercontent.com/smnorris/bcdata/main/data/primary_keys.json"
)

__version__ = "0.10.0dev0"
38 changes: 27 additions & 11 deletions bcdata/bc2pg.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import json
import logging
import os

import geopandas as gpd
import numpy
import stamina
from geoalchemy2 import Geometry
import requests
from shapely.geometry.linestring import LineString
from shapely.geometry.multilinestring import MultiLineString
from shapely.geometry.multipoint import MultiPoint
Expand Down Expand Up @@ -32,6 +34,19 @@
]


def get_primary_keys():
"""download primary key data file"""
response = requests.get(bcdata.PRIMARY_KEY_DB_URL)
if response.status_code == 200:
primary_keys = response.json()
else:
log.warning(
f"Failed to download primary key database at {bcdata.PRIMARY_KEY_DB_URL}"
)
primary_keys = {}
return primary_keys


def bc2pg( # noqa: C901
dataset,
db_url,
Expand All @@ -45,11 +60,10 @@ def bc2pg( # noqa: C901
timestamp=True,
schema_only=False,
append=False,
refresh=False,
):
"""Request table definition from bcdc and replicate in postgres"""
if append and refresh:
raise ValueError("Options append and refresh are not compatible")
if schema_only and append:
raise ValueError("Options schema_only and append are not compatible")

dataset = bcdata.validate_name(dataset)
schema_name, table_name = dataset.lower().split(".")
Expand All @@ -75,18 +89,14 @@ def bc2pg( # noqa: C901

df = None # just for tracking if first download is done by geometry type check

# if appending or refreshing, get column names from db, make sure table exists
if append or refresh:
# if appending, get column names from db, make sure table exists
if append:
if schema_name + "." + table_name not in db.tables:
raise ValueError(f"{schema_name}.{table_name} does not exist")
column_names = db.get_columns(schema_name, table_name)

# clear existing data if directed by refresh option
if refresh:
db.truncate(schema_name, table_name)

# if not appending/refreshing, define and create table
if not append or refresh:
# if not appending, define and create table
if not append:
# get info about the table from catalogue
table_definition = bcdata.get_table_definition(dataset)

Expand Down Expand Up @@ -132,6 +142,12 @@ def bc2pg( # noqa: C901
if geometry_type not in SUPPORTED_TYPES:
raise ValueError("Geometry type {geometry_type} is not supported")

# if primary key is not supplied, use default (if present in list)
primary_keys = get_primary_keys()
if not primary_key and dataset.lower() in primary_keys:
primary_key = primary_keys[dataset.lower()]

# fail if specified primary key is not in the table
if primary_key and primary_key.upper() not in [
c["column_name"].upper() for c in table_definition["schema"]
]:
Expand Down
22 changes: 21 additions & 1 deletion bcdata/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from cligj import compact_opt, indent_opt, quiet_opt, verbose_opt

import bcdata
from bcdata.database import Database

LOG_FORMAT = "%(asctime)s:%(levelname)s:%(name)s: %(message)s"

Expand Down Expand Up @@ -385,6 +386,18 @@ def bc2pg(
timestamp = False
else:
timestamp = True
if refresh and append:
raise ValueError("Options append and refresh are not compatible")
if refresh and (schema == "bcdata"):
raise ValueError(
"Refreshing tables in bcdata schema is not supported, use another schema"
)
elif refresh and schema:
schema_target = schema
elif refresh and not schema:
schema_target, t = bcdata.validate_name(dataset).lower().split(".")
if refresh:
schema = "bcdata"
out_table = bcdata.bc2pg(
dataset,
db_url,
Expand All @@ -398,6 +411,13 @@ def bc2pg(
timestamp=timestamp,
schema_only=schema_only,
append=append,
refresh=refresh,
)

# if refreshing, flush from temp bcdata schema to target schema
if refresh:
db = Database(db_url)
s, table = out_table.split(".")
db.refresh(schema_target, table)
out_table = schema_target + "." + table

log.info("Load of {} to {} in {} complete".format(dataset, out_table, db_url))
24 changes: 22 additions & 2 deletions bcdata/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,34 @@ def drop_table(self, schema, table):
)
self.execute(dbq)

def truncate(self, schema, table):
def refresh(self, schema, table):
# move data from temp table to target table
if schema + "." + table in self.tables:
log.warning(f"Truncating table {schema}.{table}")
log.warning(
f"Truncating table {schema}.{table} and refreshing from bcdata.{table}"
)
dbq = sql.SQL("TRUNCATE {schema}.{table}").format(
schema=sql.Identifier(schema),
table=sql.Identifier(table),
)
self.execute(dbq)
columns = list(
set(self.get_columns("bcdata", table)).intersection(
self.get_columns(schema, table)
)
)
identifiers = [sql.Identifier(c) for c in columns]
dbq = sql.SQL(
"""INSERT INTO {schema}.{table}
({columns})
SELECT {columns} FROM bcdata.{table}"""
).format(
schema=sql.Identifier(schema),
table=sql.Identifier(table),
columns=sql.SQL(",").join(identifiers),
)
self.execute(dbq)
self.drop_table("bcdata", table)

def define_table(
self,
Expand Down
41 changes: 23 additions & 18 deletions tests/test_bc2pg.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,29 @@ def test_bc2pg_primary_key():
DB_CONNECTION.execute("drop table " + ASSESSMENTS_TABLE)


def test_bc2pg_get_primary_keys():
primary_keys = bcdata.get_primary_keys()
assert primary_keys[ASSESSMENTS_TABLE] == "stream_crossing_id"


def test_bc2pg_primary_key_default():
bcdata.bc2pg(ASSESSMENTS_TABLE, DB_URL, count=100)
assert ASSESSMENTS_TABLE in DB_CONNECTION.tables
r = DB_CONNECTION.query(
"""
SELECT a.attname FROM pg_index i
JOIN pg_class c ON c.oid = i.indrelid
JOIN pg_attribute a ON a.attrelid = c.oid AND a.attnum = any(i.indkey)
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE relname = 'pscis_assessment_svw'
AND nspname = 'whse_fish'
AND indisprimary
"""
)
assert r[0][0] == "stream_crossing_id"
DB_CONNECTION.execute("drop table " + ASSESSMENTS_TABLE)


def test_bc2pg_filter():
bcdata.bc2pg(
AIRPORTS_TABLE,
Expand Down Expand Up @@ -164,21 +187,3 @@ def test_bc2pg_append_to_other():
r = DB_CONNECTION.query("select * from whse_imagery_and_base_maps.arpt")
assert len(r) == 2
DB_CONNECTION.execute("drop table whse_imagery_and_base_maps.arpt")


def test_bc2pg_refresh():
bcdata.bc2pg(AIRPORTS_TABLE, DB_URL)
bcdata.bc2pg(
AIRPORTS_TABLE,
DB_URL,
query="AIRPORT_NAME='Terrace (Northwest Regional) Airport'",
)
bcdata.bc2pg(
AIRPORTS_TABLE,
DB_URL,
query="AIRPORT_NAME='Victoria International Airport'",
refresh=True,
)
r = DB_CONNECTION.query("select * from whse_imagery_and_base_maps.gsr_airports_svw")
assert len(r) == 1
DB_CONNECTION.execute("drop table whse_imagery_and_base_maps.gsr_airports_svw")
33 changes: 33 additions & 0 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,36 @@ def test_bc2pg():
assert result.exit_code == 0
assert AIRPORTS_TABLE.lower() in DB_CONNECTION.tables
DB_CONNECTION.execute("drop table " + AIRPORTS_TABLE.lower())


def test_bc2pg_refresh():
runner = CliRunner()
r1 = runner.invoke(
cli,
[
"bc2pg",
AIRPORTS_TABLE,
"--db_url",
DB_URL,
"--query",
"AIRPORT_NAME='Terrace (Northwest Regional) Airport'",
],
)
r2 = runner.invoke(
cli,
[
"bc2pg",
AIRPORTS_TABLE,
"--db_url",
DB_URL,
"--refresh",
"--query",
"AIRPORT_NAME='Victoria International Airport'",
],
)
q = DB_CONNECTION.query("select * from whse_imagery_and_base_maps.gsr_airports_svw")
assert r1.exit_code == 0
assert r2.exit_code == 0
assert AIRPORTS_TABLE.lower() in DB_CONNECTION.tables
assert len(q) == 1
DB_CONNECTION.execute("drop table " + AIRPORTS_TABLE.lower())