Skip to content

Commit

Permalink
simplify and finalize flatfile metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
rizac committed Sep 15, 2023
1 parent 4018530 commit ee9bc78
Show file tree
Hide file tree
Showing 4 changed files with 187 additions and 142 deletions.
63 changes: 17 additions & 46 deletions egsim/smtk/flatfile/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
from openquake.hazardlib.scalerel import PeerMSR
from openquake.hazardlib.contexts import RuptureContext

from .columns import read_column_metadata, ColumnDtype
from .columns import (ColumnDtype, get_rupture_param_columns,
get_dtypes_and_defaults, get_column_names)
from .. import get_SA_period
from ...smtk.trellis.configure import vs30_to_z1pt0_cy14, vs30_to_z2pt5_cb14

Expand All @@ -29,14 +30,8 @@ def read_flatfile(filepath_or_buffer: str, sep: str = None) -> pd.DataFrame:
:param sep: the separator (or delimiter). None means 'infer' (it might
take more time)
"""
return read_csv(filepath_or_buffer, sep=sep, dtype=_c_dtype,
defaults=_c_default)


# Column names and aliases, mapped to their dtype (lazy loaded, see __getattr__):
_c_dtype: dict[str, Union[str, pd.CategoricalDtype]]
# Column names and aliases, mapped to their default (lazy loaded, see __getattr__):
_c_default: dict[str, Any]
dtypes, defaults = get_dtypes_and_defaults()
return read_csv(filepath_or_buffer, sep=sep, dtype=dtypes, defaults=defaults)


missing_values = ("", "null", "NULL", "None",
Expand Down Expand Up @@ -363,7 +358,7 @@ def query(flatfile: pd.DataFrame, query_expression: str) -> pd.DataFrame:

def get_column_name(flatfile:pd.DataFrame, column:str) -> str:
ff_cols = set(flatfile.columns)
cols = _c_alias[column] & ff_cols
cols = get_column_names(column) & ff_cols
if len(cols) > 1:
raise ConflictingColumns(*cols)
elif len(cols) == 0:
Expand All @@ -373,11 +368,6 @@ def get_column_name(flatfile:pd.DataFrame, column:str) -> str:
return next(iter(cols))


# Column name and aliases, mapped to all their aliases (lazy loaded, see __getattr__).
# The dict values will always include at least the column name itself:
_c_alias: dict[str, set[str]]


def get_event_id_column_names(flatfile: pd.DataFrame) -> list[str, ...]:
try:
return [get_column_name(flatfile, 'event_id')]
Expand Down Expand Up @@ -592,11 +582,16 @@ def fill_na(flatfile:pd.DataFrame,
class EventContext(RuptureContext):
"""A RuptureContext accepting a flatfile (pandas DataFrame) as input"""

rupture_params:set[str] = None

def __init__(self, flatfile: pd.DataFrame):
super().__init__()
if not isinstance(flatfile.index, IntegerIndex):
raise ValueError('flatfile index should be made of unique integers')
self._flatfile = flatfile
if self.__class__.rupture_params is None:
# get rupture params once for all instances the first time only:
self.__class__.rupture_params = get_rupture_param_columns()

def __eq__(self, other): # FIXME: legacy code, is it still used?
assert isinstance(other, EventContext) and \
Expand All @@ -613,6 +608,7 @@ def sids(self) -> IntegerIndex:
# delete or rename. See superclass for details
return self._flatfile.index


def __getattr__(self, column_name):
"""Return a non-found Context attribute by searching in the underlying
flatfile column. Raises AttributeError (as usual) if `item` is not found
Expand All @@ -621,15 +617,11 @@ def __getattr__(self, column_name):
values = self._flatfile[column_name].values
except KeyError:
raise MissingColumn(column_name)
if column_name in _c_rupture_params:
if column_name in self.rupture_params:
values = values[0]
return values


# Column names and alies denoting rupture params (lazy loaded, see __getattr__):
_c_rupture_params: set[str]


class InvalidColumn(Exception):
"""
General flatfile column(s) error. See subclasses for details
Expand All @@ -645,25 +637,17 @@ def __str__(self):
suffix_str = self.args[0]
# If we passed a valid column name to __init__, change suffix_str
# to include all column aliases:
col_names = _c_alias.get(suffix_str, None)
if col_names is not None:
# now sort (first col name, then alias(es)):
sorted_names = [c for c in col_names if c in _c_names] + \
[c for c in col_names if c not in _c_names]
suffix_str = sorted_names[0].__repr__()
if len(sorted_names) > 1:
suffix_str += " (alias" if len(sorted_names) == 2 else " (aliases"
suffix_str += f": {', '.join(_.__repr__() for _ in sorted_names[1:])})"
sorted_names = get_column_names(suffix_str, sort=True)
suffix_str = sorted_names[0].__repr__()
if len(sorted_names) > 1:
suffix_str += " (alias" if len(sorted_names) == 2 else " (aliases"
suffix_str += f": {', '.join(_.__repr__() for _ in sorted_names[1:])})"
return f"{prefix_str} {suffix_str}"

def __repr__(self):
return self.__str__()


# the set of column names (no aliases. lazy loaded, see __getattr__):
_c_names: set[str]


class MissingColumn(InvalidColumn, AttributeError, KeyError):
"""MissingColumnError. It inherits also from AttributeError and
KeyError to be compliant with pandas and OpenQuake"""
Expand All @@ -675,19 +659,6 @@ def __init__(self, *names):
InvalidColumn.__init__(self, " vs. ".join(_.__repr__() for _ in names))


def __getattr__(name):
"""lazy load global module variables from YAML"""
args = {'_c_alias': {}, '_c_default': {}, '_c_dtype': {},
'_c_rupture_params': set(), '_c_names': set()}
if name in args:
read_column_metadata(**{k[3:] :v for k, v in args.items()})
for n, v in args.items():
globals()[f'{n}'] = v
return globals()[name]
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")



# FIXME REMOVE LEGACY STUFF CHECK WITH GW:

# FIXME: remove columns checks will be done when reading the flatfile and
Expand Down
226 changes: 147 additions & 79 deletions egsim/smtk/flatfile/columns.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

# try to speed up yaml.safe_load (https://pyyaml.org/wiki/PyYAMLDocumentation):
from yaml import load as yaml_load

try:
from yaml import CSafeLoader as default_yaml_loader # faster, if available
except ImportError:
Expand Down Expand Up @@ -46,88 +47,155 @@ class ColumnDtype(Enum):
str = str, np.str_, np.object_


_ff_metadata_path = join(dirname(__file__), 'columns.yaml')
def get_rupture_param_columns() -> set[str]:
"""Return a set of strings with all column names (including aliases)
denoting a rupture parameter
"""
rup = set()
_extract_from_columns(load_from_yaml(), rupture_params=rup)
return rup


# Column name and aliases, mapped to all their aliases
# The dict values will always include at least the column name itself:
_alias: dict[str, set[str]] = None # noqa


def read_column_metadata(*, names:set[str]=None,
rupture_params:set[str]=None,
sites_params: set[str] = None,
distances: set[str] = None,
imts: set[str] = None,
dtype:dict[str, Union[str, pd.CategoricalDtype]]=None,
alias:dict[str, set[str]]=None,
default:dict[str, Any]=None,
bounds:dict[str, dict[str, Any]]=None,
help:dict=None):
"""Put columns metadata stored in the YAML file into the passed function arguments.
:param names: set or None. If set, it will be populated with the names of
the flatfile columns registered in the YAML, aliases excluded
:param rupture_params: set or None. If set, it will be populated with the flatfile
columns that denote an OpenQuake rupture parameter
:param sites_params: set or None. If set, it will be populated with the flatfile
columns that denote an OpenQuake sites parameter
:param distances: set or None. If set, it will be populated with the flatfile
columns that denote an OpenQuake distance measure
:param imts: set or None. If set, it will be populated with the flatfile
columns that denote an OpenQuake intensity parameter
:param dtype: dict or None. If dict, it will be populated with the flatfile columns
and aliases mapped to their data type (a name of an item of
the enum :ref:`ColumnDtype` - or pandas `CategoricalDtype`)
:param alias: dict or None. If dict, it will be populated with the flatfile columns
and aliases mapped to the set of their aliases. A dict value might be therefore
keyed by more than one dict key, and contains at least its key
:param default: dict or None. If dict, it will be populated with the
flatfile columns and aliases mapped to their default, if defined
:param bounds: dict or None, of dict, it will be populated with the flatfile
columns and aliases mapped to a dict with keys "<=", "<" ">=", ">" mapped
in turn to a value
:param help: dict or None, if dict, it will be populated with all column names
and aliases mapped to their description
def get_column_names(column, sort=False) -> Union[set[str], list[str]]:
"""Return all possible names of the given column, as set of strings. If sort is
True, a list is returned where the first element is the primary column name (one of
the top-level keys defined in the YAML dict)
"""
global _alias
if _alias is None:
_alias = {}
_extract_from_columns(load_from_yaml(), alias=_alias)
names = _alias.get(column, set())
if not sort:
return names
else:
return [n for n in names if n in _columns] + \
[n for n in names if n in _columns]

def _upcast(val, dtype):
"""allow for some dtypes in certain cases"""
if dtype == ColumnDtype.float.name and isinstance(val, int):
return float(val)
elif dtype == ColumnDtype.datetime.name and isinstance(val, date):
return datetime(val.year, val.month, val.day)
return val

def get_dtypes_and_defaults() -> \
tuple[dict[str, Union[str, pd.CategoricalDtype]], dict[str, Any]]:
"""Return the column data types and defaults. Dict keys are all columns names
(including aliases) mapped to their data type or default. Columns with no data
type or default are not present.
"""
_dtype, _default = {}, {}
_extract_from_columns(load_from_yaml(), dtype=_dtype, default=_default)
return _dtype, _default


# YAML file path:
_ff_metadata_path = join(dirname(__file__), 'columns.yaml')
# cache storage of the data in the YAML:
_columns: dict[str, dict[str, Any]] = None # noqa


def load_from_yaml(cache=True) -> dict[str, dict[str, Any]]:
"""Loads the content of the associated YAML file with all columns
information and returns it as Python dict
:param cache: if True, a cache version will be returned (faster, but remember
that any change to the cached version will persist permanently!). Otherwise,
a new dict loaded from file (slower) will be returned
"""
global _columns
if cache and _columns:
return _columns
with open(_ff_metadata_path) as fpt:
for c_name, props in yaml_load(fpt, default_yaml_loader).items():
if names is not None:
names.add(c_name)
aliases = props.get('alias', [])
if isinstance(aliases, str):
aliases = {aliases}
else:
aliases = set(aliases)
aliases.add(c_name)
for name in aliases:
if 'type' in props:
ctype = ColumnType[props['type']]
if rupture_params is not None and ctype == ColumnType.rupture:
rupture_params.add(name)
if sites_params is not None and ctype == ColumnType.sites:
sites_params.add(name)
if distances is not None and ctype == ColumnType.distance:
distances.add(name)
if imts is not None and ctype == ColumnType.intensity:
imts.add(name)
if alias is not None:
alias[name] = aliases
if dtype is not None and 'dtype' in props:
dtype[name] = props['dtype']
if isinstance(dtype[name], (list, tuple)):
dtype[name] = pd.CategoricalDtype(dtype[name])
if default is not None and 'default' in props:
default[name] = _upcast(props['default'], props['dtype'])
if bounds is not None:
_bounds = {k: _upcast(props[k], props['dtype'])
for k in ["<", "<=", ">", ">="]
if k in props}
if _bounds:
bounds[name] = _bounds
if help is not None and props.get('help', ''):
help[name] = props['help']
_cols = yaml_load(fpt, default_yaml_loader)
if cache:
_columns = _cols
return _cols


def _extract_from_columns(columns: dict[str, dict[str, Any]], *,
rupture_params:set[str]=None,
sites_params: set[str] = None,
distances: set[str] = None,
imts: set[str] = None,
dtype:dict[str, Union[str, pd.CategoricalDtype]]=None,
alias:dict[str, set[str]]=None,
default:dict[str, Any]=None,
bounds:dict[str, dict[str, Any]]=None,
help:dict=None):
"""Extract data from `columns` (the metadata stored in the YAML file)
and put it into the passed function arguments that are not missing / None.
:param rupture_params: set or None. If set, it will be populated with all flatfile
columns (aliases included) that denote an OpenQuake rupture parameter
:param sites_params: set or None. If set, it will be populated with all flatfile
columns (aliases included) that denote an OpenQuake sites parameter
:param distances: set or None. If set, it will be populated with all flatfile
columns (aliases included) that denote an OpenQuake distance measure
:param imts: set or None. If set, it will be populated with all flatfile
columns (aliases included) that denote an OpenQuake intensity parameter
:param dtype: dict or None. If dict, it will be populated with all flatfile columns
(aliases included) mapped to their data type (a name of an item of
the enum :ref:`ColumnDtype` - or pandas `CategoricalDtype`). Columns with no
data type will not be present
:param alias: dict or None. If dict, it will be populated with all flatfile columns
(aliases included) mapped to their aliases. E.g., a column that can take N
additional names will have N+1 entries, all of them mapped to same the set of
N+1 names. All columns will be present: if a column has no alias, it will be
mapped to itself (1-element set).
:param default: dict or None. If dict, it will be populated with all flatfile
columns (aliases included) mapped to their default, if defined. Columns with no
default will not be present
:param bounds: dict or None, of dict, it will be populated with all flatfile
columns (aliases included) mapped to a dict with keys "<=", "<" ">=", ">" mapped
in turn to a value consistent with the column dtype. Columns with no bounds
will not be present
:param help: dict or None, if dict, it will be populated with all flatfile columns
(aliases included) mapped to their description. Columns with no help will not be
present
"""
check_type = rupture_params is not None or sites_params is not None \
or distances is not None or imts is not None
for c_name, props in columns.items():
aliases = props.get('alias', [])
if isinstance(aliases, str):
aliases = {aliases}
else:
aliases = set(aliases)
aliases.add(c_name)
for name in aliases:
if check_type and 'type' in props:
ctype = ColumnType[props['type']]
if rupture_params is not None and ctype == ColumnType.rupture:
rupture_params.add(name)
if sites_params is not None and ctype == ColumnType.sites:
sites_params.add(name)
if distances is not None and ctype == ColumnType.distance:
distances.add(name)
if imts is not None and ctype == ColumnType.intensity:
imts.add(name)
if alias is not None:
alias[name] = aliases
if dtype is not None and 'dtype' in props:
dtype[name] = props['dtype']
if isinstance(dtype[name], (list, tuple)):
dtype[name] = pd.CategoricalDtype(dtype[name])
if default is not None and 'default' in props:
default[name] = _upcast(props['default'], props['dtype'])
if bounds is not None:
_bounds = {k: _upcast(props[k], props['dtype'])
for k in ["<", "<=", ">", ">="]
if k in props}
if _bounds:
bounds[name] = _bounds
if help is not None and props.get('help', ''):
help[name] = props['help']


def _upcast(val, dtype):
"""allow for some dtypes in certain cases"""
if dtype == ColumnDtype.float.name and isinstance(val, int):
return float(val)
elif dtype == ColumnDtype.datetime.name and isinstance(val, date):
return datetime(val.year, val.month, val.day)
return val
Loading

0 comments on commit ee9bc78

Please sign in to comment.