Skip to content

Commit

Permalink
Add WQP
Browse files Browse the repository at this point in the history
Add WQX ETL code
  • Loading branch information
webb-ben committed Nov 16, 2023
1 parent e3e7294 commit e55f1f5
Show file tree
Hide file tree
Showing 11 changed files with 60,489 additions and 148 deletions.
8 changes: 4 additions & 4 deletions wis2box/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ def remove_collection(name: str) -> bool:
return True


def upsert_collection_item(collection_id: str, item: dict) -> str:
def upsert_collection_item(collection_id: str, item: dict,
method: str = 'POST') -> str:
"""
Add or update a collection item
Expand All @@ -103,9 +104,8 @@ def upsert_collection_item(collection_id: str, item: dict) -> str:
:returns: `str` identifier of added item
"""
backend = load_backend()
backend.upsert_collection_items(collection_id, [item])

return True
if backend.upsert_collection_items(collection_id, [item], method):
return True


def delete_collection_item(collection_id: str, item_id: str) -> str:
Expand Down
22 changes: 17 additions & 5 deletions wis2box/api/backend/sensorthings.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
#
###############################################################################

import json
import logging

from requests import Session
from typing import Tuple

from wis2box.api.backend.base import BaseBackend
from wis2box.util import url_join, to_json

LOGGER = logging.getLogger(__name__)

Expand All @@ -43,7 +43,7 @@ def __init__(self, defs: dict) -> None:
super().__init__(defs)

self.type = 'SensorThings'
self.url = defs.get('url').rstrip('/')
self.url = url_join(defs.get('url'))
self.http = Session()

def sta_id(self, collection_id: str) -> Tuple[str]:
Expand All @@ -54,7 +54,8 @@ def sta_id(self, collection_id: str) -> Tuple[str]:
:returns: `str` of STA index
"""
return self.url + '/' + collection_id.split('.').pop()
entity = collection_id.split('.').pop()
return url_join(self.url, entity)

def add_collection(self, collection_id: str) -> dict:
"""
Expand Down Expand Up @@ -86,7 +87,8 @@ def has_collection(self, collection_id: str) -> bool:
"""
return collection_id != ''

def upsert_collection_items(self, collection_id: str, items: list) -> str:
def upsert_collection_items(self, collection_id: str, items: list,
method: str = 'POST') -> str:
"""
Add or update collection items
Expand All @@ -98,7 +100,17 @@ def upsert_collection_items(self, collection_id: str, items: list) -> str:
sta_index = self.sta_id(collection_id)

for entity in items:
self.http.post(sta_index, json.dumps(entity))
if method == 'PATCH':
item_id = entity['@iot.id']
url = f'''{sta_index}('{item_id}')'''
r = self.http.patch(url, data=to_json(entity))
else:
r = self.http.post(sta_index, data=to_json(entity))

if not r.ok:
LOGGER.error(r.content)
return False
return True

def delete_collection_item(self, collection_id: str, item_id: str) -> str:
"""
Expand Down
108 changes: 68 additions & 40 deletions wis2box/data/csv2sta.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,16 @@
#
###############################################################################

import csv
from csv import DictReader
from datetime import datetime
from pytz import timezone
from io import StringIO
import json
import logging
from pathlib import Path
from typing import Union

from wis2box.data.geojson import ObservationDataGeoJSON
from wis2box.util import make_uuid

LOGGER = logging.getLogger(__name__)

Expand All @@ -42,69 +43,96 @@ def transform(self, input_data: Union[Path, bytes],
input_bytes = self.as_bytes(input_data)

fh = StringIO(input_bytes.decode())
reader = csv.reader(fh, delimiter=',', quoting=csv.QUOTE_NONNUMERIC)

# read in header rows
rows_read = 0
skip = 7
while rows_read <= skip:
row = next(reader)
if rows_read == 3:
loc_names = row
elif rows_read == 4:
loc = row
elif rows_read == skip:
col_names = row
rows_read += 1

location = dict(zip(loc_names, loc))
location['Coordinates'] = location.get('Coordinates (long, lat)', loc[2]) # noqa
location['Coordinates'] = location['Coordinates'].replace('(', '[')
location['Coordinates'] = location['Coordinates'].replace(')', ']')
LOGGER.debug(location['Coordinates'])
location['Coordinates'] = json.loads(location['Coordinates'])
LOGGER.debug('Processing data from ' + location['Location'])
reader = DictReader(fh)

for row in reader:
data_dict = dict(zip(col_names, row))
identifier = row['ResultIdentifier']
unitOfMeasurement = row['ResultMeasure/MeasureUnitCode'] or row['DetectionQuantitationLimitMeasure/MeasureUnitCode'] # noqa
datastream = make_uuid(f"{row['CharacteristicName']}-{row['MonitoringLocationIdentifier']}-{unitOfMeasurement}") # noqa

datastream = filename.split('_').pop(0)
_ = f"{row['ActivityStartDate']} {row['ActivityStartTime/Time']}"
isodate = datetime.strptime(
data_dict.get('Datetime (UTC)'), '%Y-%m-%d %H:%M:%S'
)
data_date = isodate.strftime('%Y-%m-%dT%H:%M:%SZ')
_, '%Y-%m-%d %H:%M:%S'
).replace(tzinfo=timezone(row['ActivityStartTime/TimeZoneCode']))
rowdate = isodate.strftime('%Y-%m-%dT%H:%M:%SZ')
isodate = isodate.strftime('%Y%m%dT%H%M%S')

LongitudeMeasure = row['ActivityLocation/LongitudeMeasure'] # noqa
LatitudeMeasure = row['ActivityLocation/LatitudeMeasure'] # noqa
try:
result = float(data_dict['Result'])
result = float(row['ResultMeasureValue'])
except ValueError:
result = data_dict['Result']
result = row['ResultDetectionConditionText']

if not result:
LOGGER.warning(f'No results for {identifier}')
continue

identifier = f'{datastream}_{isodate}'
resultQuality = (row['MeasureQualifierCode'] or row['ResultStatusIdentifier']) or ' '.join([ # noqa
row['ResultDetectionQuantitationLimitUrl'],
row['DetectionQuantitationLimitMeasure/MeasureValue'],
row['DetectionQuantitationLimitMeasure/MeasureUnitCode']
])
LOGGER.debug(f'Publishing with ID {identifier}')
self.output_data[identifier] = {
'_meta': {
'identifier': identifier,
'data_date': data_date,
'relative_filepath': self.get_local_filepath(data_date)
'rowdate': rowdate,
'relative_filepath': self.get_local_filepath(rowdate)
},
'geojson': {
'phenomenonTime': data_date,
'resultTime': data_date,
'phenomenonTime': rowdate,
'resultTime': rowdate,
'result': result,
'resultQuality': resultQuality,
'parameters': {
'ResultCommentText': row['ResultCommentText'],
'HydrologicCondition': row['HydrologicCondition'],
'HydrologicEvent': row['HydrologicEvent']
},
'Datastream': {'@iot.id': datastream},
'FeatureOfInterest': {
'@iot.id': datastream,
'name': location.get('Location'),
'description': data_dict.get('Parameter'),
'name': row['MonitoringLocationName'],
'description': row['MonitoringLocationName'],
'encodingType': 'application/vnd.geo+json',
'feature': {
'type': 'Point',
'coordinates': location['Coordinates']
}
'coordinates': [LongitudeMeasure, LatitudeMeasure]
},
},
}
}

try:
depth = float(row['ActivityDepthHeightMeasure/MeasureValue'])
LOGGER.info('Adding samplings')
featureOfInterest = self.output_data[identifier]['geojson']['FeatureOfInterest'] # noqa
featureOfInterest['Samplings'] = [{
'name': row['ActivityTypeCode'],
'description': row['ActivityTypeCode'] + row['ActivityRelativeDepthName'], # noqa
'atDepth': depth, # noqa
'depthUom': row['ActivityDepthHeightMeasure/MeasureUnitCode'], # noqa
'encodingType': 'application/vnd.geo+json',
'samplingLocation': {
'type': 'Point',
'coordinates': [LongitudeMeasure, LatitudeMeasure]
},
'Thing': {
'@iot.id': row['MonitoringLocationIdentifier']
},
'Sampler': {
'name': row['OrganizationFormalName'],
'SamplingProcedure': {
'name': row['ActivityTypeCode']
}
},
'SamplingProcedure': {
'name': row['ActivityTypeCode']
}
}]
except (TypeError, ValueError):
LOGGER.info('No Sampling detected')

def __repr__(self):
return '<ObservationDataCSV>'
66 changes: 29 additions & 37 deletions wis2box/data/observation.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,23 @@
import click
import csv
from datetime import datetime, timedelta
from io import BytesIO, TextIOWrapper
from json.decoder import JSONDecodeError
import logging
from pathlib import Path
from requests import Session, RequestException
from typing import Union
from zipfile import ZipFile

from wis2box import cli_helpers
from wis2box.api import setup_collection
from wis2box.data.base import BaseAbstractData
from wis2box.env import DATADIR, DOCKER_API_URL, STORAGE_INCOMING
from wis2box.env import (STORAGE_INCOMING, WQP_URL, STATIONS, RESULTS_URL)
from wis2box.storage import put_data
from wis2box.topic_hierarchy import validate_and_load

LOGGER = logging.getLogger(__name__)

USBR_URL = 'https://data.usbr.gov/rise/api/result/download'

STATION_METADATA = DATADIR / 'metadata' / 'station'
STATIONS = STATION_METADATA / 'location_data.csv'


def gcm() -> dict:
"""
Expand All @@ -54,8 +51,8 @@ def gcm() -> dict:
'id': 'Observations',
'title': 'Observations',
'description': 'SensorThings API Observations',
'keywords': ['observation', 'dam'],
'links': ['https://data.usbr.gov/rise-api'],
'keywords': ['observation', 'wqp'],
'links': [WQP_URL],
'bbox': [-180, -90, 180, 90],
'time_field': 'resultTime',
'id_field': '@iot.id'
Expand All @@ -81,11 +78,11 @@ def __init__(self, defs: dict) -> None:

@property
def begin(self):
return self._begin.strftime('%Y-%m-%dT')
return self._begin.strftime('%m-%d-%Y')

@property
def end(self):
return self._end.strftime('%Y-%m-%dT')
return self._end.strftime('%m-%d-%Y')

def set_date(self, begin: str = '', end: str = '') -> None:
"""
Expand All @@ -112,7 +109,6 @@ def _get_response(self, url: str, params: dict = {}):
:returns: STA response
"""
r = self.http.get(url, params=params)

if r.ok:
try:
response = r.json()
Expand All @@ -129,21 +125,27 @@ def transform(
self, input_data: Union[Path, bytes], filename: str = ''
) -> bool:
rmk = f'{input_data}_{self.begin}_{self.end}'

params = {
'type': 'csv',
'after': self.begin,
'before': self.end,
'itemId': input_data,
'filename': f'{rmk}.csv'
'zip': 'yes',
'siteid': input_data,
'mimeType': 'csv',
'startDateLo': self.begin,
'startDateHi': self.end,
'dataProfile': 'resultPhysChem'
}
data = self._get_response(USBR_URL, params)
bytes = self.as_bytes(data)

if 'No data' in str(bytes):
response = self._get_response(RESULTS_URL, params)
zipfiles = ZipFile(BytesIO(response))
[zipfile] = zipfiles.namelist()
with zipfiles.open(zipfile) as fh:
data = TextIOWrapper(fh, 'utf-8').read()
bytes = self.as_bytes(data)

if len(bytes) <= 2278:
LOGGER.warning(f'No data for {rmk}')
else:
path = f'{STORAGE_INCOMING}/{self.local_filepath(self.end)}/{rmk}.csv' # noqa
put_data(data, path)
put_data(bytes, path)
LOGGER.debug('Finished processing subset')

def local_filepath(self, date_):
Expand All @@ -155,8 +157,6 @@ def __repr__(self):


def sync_datastreams(station_id, begin, end):
url = DOCKER_API_URL + '/collections/datastreams/items'

_, plugins = validate_and_load('iow.demo.Observations')
[plugin] = [p for p in plugins
if isinstance(p, ObservationDataDownload)]
Expand All @@ -166,19 +166,11 @@ def sync_datastreams(station_id, begin, end):
if end:
plugin.set_date(end=end)

params = {'Thing': station_id, 'resulttype': 'hits'}
response = plugin._get_response(url=url, params=params)
hits = response.get('numberMatched', 10000)

params = {'Thing': station_id, 'limit': hits}
datastreams = plugin._get_response(url=url, params=params)

for datastream in datastreams['features']:
try:
plugin.transform(datastream['id'], datastream['id'])
except Exception as err:
LOGGER.error(datastream['id'])
LOGGER.error(err)
try:
plugin.transform(station_id)
except Exception as err:
LOGGER.error(station_id)
LOGGER.error(err)


@click.group()
Expand Down Expand Up @@ -211,7 +203,7 @@ def ingest(ctx, station, begin, end, verbosity):
with STATIONS.open() as fh:
reader = csv.DictReader(fh)
for row in reader:
station = row['station_identifier']
station = row['MonitoringLocationIdentifier']
try:
sync_datastreams(station, begin, end)
except Exception as err:
Expand Down
Loading

0 comments on commit e55f1f5

Please sign in to comment.