Skip to content

Commit

Permalink
Update plugin based on new mapping input
Browse files Browse the repository at this point in the history
  • Loading branch information
webb-ben committed Jun 13, 2024
1 parent e55f1f5 commit 3a47be1
Show file tree
Hide file tree
Showing 7 changed files with 216 additions and 69 deletions.
5 changes: 3 additions & 2 deletions docker/sta.env
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ http_cors_allowed_origins=*
# MQTT
bus_mqttBroker=tcp://${WIS2BOX_BROKER_HOST}:${WIS2BOX_BROKER_PORT}
bus_busImplementationClass=de.fraunhofer.iosb.ilt.sta.messagebus.MqttMessageBus
bus_sendQueueSize=500
bus_sendQueueSize=1000
bus_sendWorkerPoolSize=3

# Plugins
# plugins.multiDatastream.enable=true
plugins.coreModel.idType=STRING
plugins_coreModel_idType=STRING
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
isodate
minio
OWSLib
paho-mqtt
paho-mqtt<2
pygeometa
PyYAML
requests
4 changes: 4 additions & 0 deletions wis2box/api/backend/sensorthings.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ def delete_collection_item(self, collection_id: str, item_id: str) -> str:

LOGGER.debug(f'Deleting {item_id} from {collection_id}')
sta_index = self.sta_id(collection_id)
try:
item_id = int(item_id)
except ValueError:
item_id = f"'{item_id}'"
try:
self.http.delete(f'{sta_index}({item_id})')
except Exception as err:
Expand Down
148 changes: 101 additions & 47 deletions wis2box/data/csv2sta.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@
from io import StringIO
import logging
from pathlib import Path
from requests import Session
from typing import Union

from wis2box.env import NLDI_URL
from wis2box.data.geojson import ObservationDataGeoJSON
from wis2box.util import make_uuid
from wis2box.util import make_uuid, url_join

LOGGER = logging.getLogger(__name__)

Expand All @@ -45,29 +47,69 @@ def transform(self, input_data: Union[Path, bytes],
fh = StringIO(input_bytes.decode())
reader = DictReader(fh)

http = Session()

for row in reader:
monitoring_location_identifier = \
row['MonitoringLocationIdentifier']
url = url_join(NLDI_URL, monitoring_location_identifier)
try:
result = http.get(url)
feature = result.json()['features'][0]
except KeyError:
msg = f'Could not discover {monitoring_location_identifier}'
LOGGER.info(msg)
continue

identifier = row['ResultIdentifier']
unitOfMeasurement = row['ResultMeasure/MeasureUnitCode'] or row['DetectionQuantitationLimitMeasure/MeasureUnitCode'] # noqa
datastream = make_uuid(f"{row['CharacteristicName']}-{row['MonitoringLocationIdentifier']}-{unitOfMeasurement}") # noqa

_ = f"{row['ActivityStartDate']} {row['ActivityStartTime/Time']}"
isodate = datetime.strptime(
_, '%Y-%m-%d %H:%M:%S'
).replace(tzinfo=timezone(row['ActivityStartTime/TimeZoneCode']))
_ = ' '.join([row['ActivityStartDate'], row['ActivityStartTime/Time']]) # noqa
try:
isodate = datetime.strptime(_, '%Y-%m-%d %H:%M:%S')
except ValueError:
isodate = datetime.strptime(_, '%Y-%m-%d ')
try:
isodate = isodate.replace(
tzinfo=timezone(row['ActivityStartTime/TimeZoneCode']))
except Exception:
LOGGER.info('Could not apply time zone information')

rowdate = isodate.strftime('%Y-%m-%dT%H:%M:%SZ')
isodate = isodate.strftime('%Y%m%dT%H%M%S')

LongitudeMeasure = row['ActivityLocation/LongitudeMeasure'] # noqa
LatitudeMeasure = row['ActivityLocation/LatitudeMeasure'] # noqa
try:
analysisStartDate = datetime.strptime(
row['AnalysisStartDate'], '%Y-%m-%d'
).strftime('%Y-%m-%dT%H:%M:%SZ')
except ValueError:
analysisStartDate = rowdate

try:
LongitudeMeasure = float(row['ActivityLocation/LongitudeMeasure']) # noqa
LatitudeMeasure = float(row['ActivityLocation/LatitudeMeasure']) # noqa
geom = {
'type': 'Point',
'coordinates': [LongitudeMeasure, LatitudeMeasure]
}
except ValueError:
geom = feature['geometry']

try:
result = float(row['ResultMeasureValue'])
except ValueError:
result = row['ResultDetectionConditionText']

if not result:
LOGGER.warning(f'No results for {identifier}')
continue

resultQuality = {
'detectionCondition': row['ResultDetectionConditionText'],
'precision': row['DataQuality/PrecisionValue'],
'accuracy': row['DataQuality/BiasValue'],
'detectionLimit': {
'value': row['DetectionQuantitationLimitMeasure/MeasureValue'], # noqa
'unit': row['DetectionQuantitationLimitMeasure/MeasureUnitCode'] # noqa
}
}
resultQuality = (row['MeasureQualifierCode'] or row['ResultStatusIdentifier']) or ' '.join([ # noqa
row['ResultDetectionQuantitationLimitUrl'],
row['DetectionQuantitationLimitMeasure/MeasureValue'],
Expand All @@ -82,57 +124,69 @@ def transform(self, input_data: Union[Path, bytes],
},
'geojson': {
'phenomenonTime': rowdate,
'resultTime': rowdate,
'resultTime': analysisStartDate,
'result': result,
'resultQuality': resultQuality,
'parameters': {
'ResultCommentText': row['ResultCommentText'],
'HydrologicCondition': row['HydrologicCondition'],
'HydrologicEvent': row['HydrologicEvent']
'hydrologicCondition': row['HydrologicCondition'],
'hydrologicEvent': row['HydrologicEvent'],
'modified': row['LastUpdated'],
'status': row['ResultStatusIdentifier'],
'publisher': row['ProviderName'],
'valueType': row['ResultValueTypeName'],
'comment': row['ResultCommentText']
},
'Datastream': {'@iot.id': datastream},
'FeatureOfInterest': {
'@iot.id': datastream,
'name': row['MonitoringLocationName'],
'description': row['MonitoringLocationName'],
'encodingType': 'application/vnd.geo+json',
'feature': {
'type': 'Point',
'coordinates': [LongitudeMeasure, LatitudeMeasure]
},
'encodingType': 'application/geo+json',
'feature': geom,
},
}
}

try:
depth = float(row['ActivityDepthHeightMeasure/MeasureValue'])
LOGGER.info('Adding samplings')
deployment_info = row['ActivityTypeCode'] in (
'Field Msr/Obs-Portable Data Logger', 'Field Msr/Obs')
if not deployment_info:
LOGGER.info('Adding Sampling Entity')
sampling_name = '-'.join([
row['MonitoringLocationIdentifier'],
row['ActivityIdentifier']
])
samplingProcedure_id = '-'.join([
row['SampleCollectionMethod/MethodIdentifierContext'],
row['SampleCollectionMethod/MethodIdentifier']
])
featureOfInterest = self.output_data[identifier]['geojson']['FeatureOfInterest'] # noqa
featureOfInterest['Samplings'] = [{
'name': row['ActivityTypeCode'],
'description': row['ActivityTypeCode'] + row['ActivityRelativeDepthName'], # noqa
'atDepth': depth, # noqa
'depthUom': row['ActivityDepthHeightMeasure/MeasureUnitCode'], # noqa
'encodingType': 'application/vnd.geo+json',
'samplingLocation': {
'type': 'Point',
'coordinates': [LongitudeMeasure, LatitudeMeasure]
},
'Thing': {
'@iot.id': row['MonitoringLocationIdentifier']
},
'Sampler': {
'name': row['OrganizationFormalName'],
'SamplingProcedure': {
'name': row['ActivityTypeCode']

try:
featureOfInterest['Samplings'] = [{
'name': sampling_name,
'description': row['ActivityTypeCode'] + row['ActivityRelativeDepthName'], # noqa
'depthUom': row['ResultDepthHeightMeasure/MeasureUnitCode'], # noqa
'encodingType': 'application/geo+json',
# 'samplingLocation': geom,
'Thing': {
'@iot.id': row['MonitoringLocationIdentifier']
},
'Sampler': {
'name': row['OrganizationFormalName'],
'SamplingProcedure': {
'@iot.id': make_uuid(samplingProcedure_id),
'name': row['SampleCollectionMethod/MethodName'], # noqa
'definition': row['SampleCollectionMethod/MethodDescriptionText'], # noqa
'description': row['SampleCollectionMethod/MethodDescriptionText'] # noqa
}
}
},
'SamplingProcedure': {
'name': row['ActivityTypeCode']
}
}]
except (TypeError, ValueError):
LOGGER.info('No Sampling detected')
}]
if row['ActivityDepthHeightMeasure/MeasureValue']:
featureOfInterest['Samplings'][0]['atDepth'] = \
row['ActivityDepthHeightMeasure/MeasureValue']

except (TypeError, ValueError):
LOGGER.error('No Sampling detected')

def __repr__(self):
return '<ObservationDataCSV>'
1 change: 1 addition & 0 deletions wis2box/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
THINGS = 'Things'

GEOCONNEX = 'https://geoconnex.us/'
NLDI_URL = 'https://labs.waterdata.usgs.gov/api/nldi/linked-data/wqp'
WQP_URL = 'https://www.waterqualitydata.us'
STATION_URL = url_join(WQP_URL, 'data/Station/search')
RESULTS_URL = url_join(WQP_URL, 'data/Result/search')
Expand Down
Loading

0 comments on commit 3a47be1

Please sign in to comment.