Skip to content

Commit

Permalink
Sprint January 2024 issue 593 (#612)
Browse files Browse the repository at this point in the history
* WIS2BOXSubscriber class using get_data_mappings

* flake8 fix

* uncommit port override

* rm sarracenia artifact, add namespace to EventName

* fix flake8

* Update data_mappings.py

* Update plugin.py

---------

Co-authored-by: Tom Kralidis <tomkralidis@gmail.com>
  • Loading branch information
maaikelimper and tomkralidis authored Jan 10, 2024
1 parent 190ad7d commit 85d64d2
Show file tree
Hide file tree
Showing 8 changed files with 142 additions and 98 deletions.
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ services:

wis2box-api:
container_name: wis2box-api
image: ghcr.io/wmo-im/wis2box-api:1.0b6
image: ghcr.io/wmo-im/wis2box-api:1.0b7
restart: always
env_file:
- wis2box.env
Expand Down
19 changes: 14 additions & 5 deletions wis2box-management/wis2box/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from wis2box.api import (setup_collection, remove_collection,
delete_collections_by_retention,
reindex_collection)
from wis2box.data_mappings import DATADIR_DATA_MAPPINGS
from wis2box.data_mappings import get_data_mappings
from wis2box.env import (STORAGE_SOURCE, STORAGE_ARCHIVE, STORAGE_PUBLIC,
STORAGE_DATA_RETENTION_DAYS, STORAGE_INCOMING)
from wis2box.handler import Handler
Expand Down Expand Up @@ -185,9 +185,13 @@ def clean(ctx, days, verbosity):
def ingest(ctx, topic_hierarchy, path, recursive, verbosity):
"""Ingest data file or directory"""

data_mappings = get_data_mappings()

for file_to_process in walk_path(path, '.*', recursive):
click.echo(f'Processing {file_to_process}')
handler = Handler(file_to_process, topic_hierarchy)
handler = Handler(filepath=file_to_process,
topic_hierarchy=topic_hierarchy,
data_mappings=data_mappings)
rfp = handler.topic_hierarchy.dirpath
path = f'{STORAGE_INCOMING}/{rfp}/{file_to_process.name}'

Expand All @@ -207,8 +211,10 @@ def add_collection(ctx, filepath, verbosity):

meta = gcm(filepath.read())

if meta['topic_hierarchy'] not in DATADIR_DATA_MAPPINGS['data']:
data_mappings_topics = '\n'.join(DATADIR_DATA_MAPPINGS['data'].keys())
data_mappings = get_data_mappings()

if meta['topic_hierarchy'] not in data_mappings:
data_mappings_topics = '\n'.join(data_mappings.keys())
msg = (f"topic_hierarchy={meta['topic_hierarchy']} not found"
f" in data-mappings:\n\n{data_mappings_topics}")
raise click.ClickException(msg)
Expand Down Expand Up @@ -253,10 +259,13 @@ def reindex_collection_items(ctx, collection_id_source, collection_id_target):
def add_collection_items(ctx, topic_hierarchy, path, recursive, verbosity):
"""Add collection items to API backend"""

data_mappings = get_data_mappings()
click.echo(f'Adding GeoJSON files to collection: {topic_hierarchy}')
for file_to_process in walk_path(path, '.*.geojson$', recursive):
click.echo(f'Adding {file_to_process}')
handler = Handler(file_to_process, topic_hierarchy)
handler = Handler(filepath=file_to_process,
topic_hierarchy=topic_hierarchy,
data_mappings=data_mappings)
handler.publish()

click.echo('Done')
Expand Down
39 changes: 25 additions & 14 deletions wis2box-management/wis2box/data_mappings.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,28 @@
DATADIR = os.environ.get('WIS2BOX_DATADIR', '/data/wis2box')
DATA_MAPPINGS = Path(DATADIR) / 'data-mappings.yml'

if not DATA_MAPPINGS.exists():
msg = f'Please create a data mappings file in {DATADIR}'
LOGGER.error(msg)
raise RuntimeError(msg)

try:
with DATA_MAPPINGS.open() as fh:
DATADIR_DATA_MAPPINGS = yaml_load(fh)
assert DATADIR_DATA_MAPPINGS is not None
except Exception as err:
DATADIR_DATA_MAPPINGS = None
msg = f'Missing data mappings: {err}'
LOGGER.error(msg)
raise EnvironmentError(msg)

def get_data_mappings() -> dict:
"""
Get data mappings
:returns: `dict` of data mappings definitions
"""

data_mappings = None
if not DATA_MAPPINGS.exists():
msg = f'Please create a data mappings file in {DATADIR}'
LOGGER.error(msg)
raise RuntimeError(msg)

try:
with DATA_MAPPINGS.open() as fh:
data_mappings = yaml_load(fh)
assert data_mappings is not None
assert 'data' in data_mappings
except Exception as err:
msg = f'Issue loading data mappings: {err}'
LOGGER.error(msg)
raise EnvironmentError(msg)

return data_mappings['data']
28 changes: 15 additions & 13 deletions wis2box-management/wis2box/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,12 @@


class Handler:
def __init__(self, filepath: str, topic_hierarchy: str = None):
def __init__(self, filepath: str,
topic_hierarchy: str = None,
data_mappings: dict = None) -> None:
self.filepath = filepath
self.plugins = ()
self.input_bytes = None

LOGGER.debug('Detecting file type')
if isinstance(self.filepath, Path):
Expand All @@ -50,7 +53,9 @@ def __init__(self, filepath: str, topic_hierarchy: str = None):
LOGGER.debug('filepath is a string')
self.filetype = self.filepath.split('.')[-1]

self.is_http = self.filepath.startswith('http')
# check if filepath is a url
if self.filepath.startswith('http'):
self.input_bytes = get_data(self.filepath)

if topic_hierarchy is not None:
th = topic_hierarchy
Expand All @@ -65,7 +70,7 @@ def __init__(self, filepath: str, topic_hierarchy: str = None):

try:
self.topic_hierarchy, self.plugins = validate_and_load(
th, self.filetype, fuzzy=fuzzy)
th, data_mappings, self.filetype, fuzzy=fuzzy)
except Exception as err:
msg = f'Topic Hierarchy validation error: {err}'
raise ValueError(msg)
Expand All @@ -90,17 +95,14 @@ def publish_failure_message(self, description, plugin=None):
def handle(self) -> bool:
for plugin in self.plugins:
if not plugin.accept_file(self.filepath):
msg = f'Filepath not accepted: {self.filepath}'
LOGGER.warning(msg)
self.publish_failure_message(
description='filepath not accepted',
plugin=plugin)
msg = f'Filepath not accepted: {self.filepath} for class {plugin.__class__}' # noqa
LOGGER.debug(msg)
continue
try:
if self.is_http:
if self.input_bytes:
plugin.transform(
get_data(self.filepath),
filename=self.filepath.split('/')[-1],
input_data=self.input_bytes,
filename=self.filepath.split('/')[-1]
)
else:
plugin.transform(self.filepath)
Expand All @@ -125,8 +127,8 @@ def handle(self) -> bool:

def publish(self) -> bool:
index_name = self.topic_hierarchy.dotpath
if self.is_http:
geojson = json.load(get_data(self.filepath))
if self.input_bytes:
geojson = json.load(self.input_bytes)
upsert_collection_item(index_name, geojson)
else:
with Path(self.filepath).open() as fh1:
Expand Down
7 changes: 4 additions & 3 deletions wis2box-management/wis2box/metadata/discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from wis2box import cli_helpers
from wis2box.api import (setup_collection, upsert_collection_item,
delete_collection_item)
from wis2box.data_mappings import DATADIR_DATA_MAPPINGS
from wis2box.data_mappings import get_data_mappings
from wis2box.env import API_URL, BROKER_PUBLIC, STORAGE_PUBLIC, STORAGE_SOURCE
from wis2box.metadata.base import BaseMetadata
from wis2box.plugin import load_plugin, PLUGINS
Expand Down Expand Up @@ -217,8 +217,9 @@ def publish(ctx, filepath, verbosity):
dm = DiscoveryMetadata()
record_mcf = dm.parse_record(filepath.read())

if record_mcf['wis2box']['topic_hierarchy'] not in DATADIR_DATA_MAPPINGS['data']: # noqa
data_mappings_topics = '\n'.join(DATADIR_DATA_MAPPINGS['data'].keys()) # noqa
data_mappings = get_data_mappings()
if record_mcf['wis2box']['topic_hierarchy'] not in data_mappings: # noqa
data_mappings_topics = '\n'.join(data_mappings.keys()) # noqa
msg = (f"topic_hierarchy={record_mcf['wis2box']['topic_hierarchy']} not found" # noqa
f" in data-mappings:\n\n{data_mappings_topics}")
raise click.ClickException(msg)
Expand Down
17 changes: 13 additions & 4 deletions wis2box-management/wis2box/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import logging
from typing import Any

from wis2box.data_mappings import DATADIR_DATA_MAPPINGS

LOGGER = logging.getLogger(__name__)

PLUGINS = {
Expand Down Expand Up @@ -60,7 +58,9 @@ class PluginTypes(Enum):
STORAGE = 'storage'


def load_plugin(plugin_type: PluginTypes, defs: dict) -> Any:
def load_plugin(plugin_type: PluginTypes,
defs: dict,
data_mappings: dict = None) -> Any:
"""
loads plugin by type
Expand All @@ -75,8 +75,17 @@ def load_plugin(plugin_type: PluginTypes, defs: dict) -> Any:

if plugin_type in ['api_backend', 'api_config', 'pubsub', 'storage']:
plugin_mappings = PLUGINS
elif plugin_type == 'data':
if data_mappings is None:
msg = 'data mappings are undefined'
LOGGER.error(msg)
raise ValueError(msg)
else:
plugin_mappings = {'data': data_mappings}
else:
plugin_mappings = DATADIR_DATA_MAPPINGS
msg = f'Invalid plugin type: {plugin_type}'
LOGGER.error(msg)
raise InvalidPluginError(msg)

# check code path is valid
if '.' not in codepath:
Expand Down
115 changes: 63 additions & 52 deletions wis2box-management/wis2box/pubsub/subscribe.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@
import json
import logging
import multiprocessing as mp
from pathlib import Path

from time import sleep

import click

from wis2box.api import upsert_collection_item
from wis2box import cli_helpers
from wis2box.api import setup_collection
from wis2box.data_mappings import get_data_mappings
from wis2box.env import (BROKER_HOST, BROKER_PORT, BROKER_USERNAME,
BROKER_PASSWORD, STORAGE_SOURCE, STORAGE_ARCHIVE)
from wis2box.handler import Handler, NotHandledError
Expand All @@ -39,55 +40,66 @@
LOGGER = logging.getLogger(__name__)


def handle(filepath):
try:
LOGGER.info(f'Processing {filepath}')
handler = Handler(filepath)
if handler.handle():
LOGGER.info('Data processed')
for plugin in handler.plugins:
for filepath in plugin.files():
LOGGER.info(f'Public filepath: {filepath}')
except NotHandledError as err:
msg = f'not handled error: {err}'
LOGGER.debug(msg)
except ValueError as err:
msg = f'handle() error: {err}'
LOGGER.error(msg)
except Exception as err:
msg = f'handle() error: {err}'
raise err


def on_message_handler(client, userdata, msg):
LOGGER.debug(f'Raw message: {msg.payload}')

topic = msg.topic
message = json.loads(msg.payload)
LOGGER.info(f'Incoming message on topic {topic}')
if topic == 'wis2box/notifications':
LOGGER.info(f'Notification: {message}')
# store notification in messages collection
upsert_collection_item('messages', message)
else:
if message.get('EventName') == 's3:ObjectCreated:Put':
LOGGER.debug('Incoming data is an s3 data object')
key = str(message['Key'])
filepath = f'{STORAGE_SOURCE}/{key}'
if key.startswith(STORAGE_ARCHIVE):
LOGGER.info(f'Do not process archived-data: {key}')
return
elif 'relPath' in message:
LOGGER.debug('Incoming data is a filesystem path')
filepath = Path(message['relPath'])
class WIS2BoxSubscriber:

def __init__(self, broker):
self.data_mappings = get_data_mappings()
self.broker = broker
self.broker.bind('on_message', self.on_message_handler)
self.broker.sub('wis2box/#')

def handle(self, filepath, message):
try:
LOGGER.info(f'Processing {message["EventName"]} for {filepath}')
# load handler
handler = Handler(filepath=filepath,
data_mappings=self.data_mappings)
if handler.handle():
LOGGER.info('Data processed')
for plugin in handler.plugins:
for filepath in plugin.files():
LOGGER.info(f'Public filepath: {filepath}')
except NotHandledError as err:
msg = f'not handled error: {err}'
LOGGER.debug(msg)
except ValueError as err:
msg = f'handle() error: {err}'
LOGGER.error(msg)
except Exception as err:
msg = f'handle() error: {err}'
raise err

def on_message_handler(self, client, userdata, msg):
LOGGER.debug(f'Raw message: {msg.payload}')

topic = msg.topic
message = json.loads(msg.payload)
LOGGER.info(f'Incoming message on topic {topic}')
filepath = None
if topic == 'wis2box/notifications':
LOGGER.info(f'Notification: {message}')
# store notification in messages collection
upsert_collection_item('messages', message)
else:
LOGGER.debug('ignore message')
return
if message.get('EventName') == 's3:ObjectCreated:Put':
LOGGER.debug('Received s3:ObjectCreated:Put')
key = str(message['Key'])
filepath = f'{STORAGE_SOURCE}/{key}'
if key.startswith(STORAGE_ARCHIVE):
LOGGER.info(f'Do not process archived-data: {key}')
return
elif message.get('EventName') == 'wis2box:ReloadMappingRequest':
LOGGER.debug('Received ReloadMappingRequest')
self.data_mappings = get_data_mappings()
return
else:
LOGGER.debug('ignore message')
return

while len(mp.active_children()) == mp.cpu_count():
sleep(0.1)
p = mp.Process(target=handle, args=(filepath,))
p.start()
while len(mp.active_children()) == mp.cpu_count():
sleep(0.1)
p = mp.Process(target=self.handle, args=(filepath, message))
p.start()


@click.command()
Expand All @@ -106,7 +118,6 @@ def subscribe(ctx, verbosity):

broker = load_plugin('pubsub', defs)

broker.bind('on_message', on_message_handler)

click.echo('Subscribing to internal broker on topic wis2box/#')
broker.sub('wis2box/#')
# start the wis2box subscriber
click.echo('Starting WIS2Box subscriber')
WIS2BoxSubscriber(broker=broker)
Loading

0 comments on commit 85d64d2

Please sign in to comment.