diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index c0746e188..b43b6c847 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -34,7 +34,7 @@ repos: entry: python3 -m mypy --config-file pyproject.toml language: system types: [python] - exclude: "use_cases|tests|cyclops/(process|models|monitor|report/plot)" + exclude: "use_cases|tests|cyclops/(models|monitor|report/plot)" - repo: local hooks: diff --git a/cyclops/process/aggregate.py b/cyclops/process/aggregate.py index cadfa891e..94d9438ff 100644 --- a/cyclops/process/aggregate.py +++ b/cyclops/process/aggregate.py @@ -8,15 +8,6 @@ import pandas as pd from cyclops.process.clean import dropna_rows -from cyclops.process.column_names import ( - EVENT_NAME, - EVENT_VALUE, - RESTRICT_TIMESTAMP, - START_TIMESTAMP, - START_TIMESTEP, - STOP_TIMESTAMP, - TIMESTEP, -) from cyclops.process.constants import ALL, FIRST, LAST, MEAN, MEDIAN from cyclops.process.feature.vectorized import Vectorized from cyclops.process.impute import AggregatedImputer, numpy_2d_ffill @@ -32,10 +23,15 @@ AGGFUNCS = {MEAN: np.mean, MEDIAN: np.median} +RESTRICT_TIMESTAMP = "restrict_timestamp" +WINDOW_START_TIMESTAMP = "window_start_timestamp" +WINDOW_STOP_TIMESTAMP = "window_stop_timestamp" +START_TIMESTEP = "start_timestep" +TIMESTEP = "timestep" class Aggregator: - """Equal-spaced aggregation, or binning, of temporal data. + """Equal-spaced aggregation, or binning, of time-series data. Computing aggregation metadata is expensive and should be done sparingly. @@ -66,7 +62,7 @@ class Aggregator: def __init__( self, - aggfuncs: Dict[str, Union[str, Callable]], + aggfuncs: Dict[str, Union[str, Callable[[pd.Series], Any]]], timestamp_col: str, time_by: Union[str, List[str]], agg_by: Union[str, List[str]], @@ -78,7 +74,6 @@ def __init__( """Init.""" if agg_meta_for is not None: LOGGER.warning("Calculation of aggregation metadata slows aggregation.") - self.aggfuncs = self._process_aggfuncs(aggfuncs) self.timestamp_col = timestamp_col self.time_by = to_list(time_by) @@ -88,7 +83,6 @@ def __init__( self.agg_meta_for = to_list_optional(agg_meta_for) self.window_times = pd.DataFrame() # Calculated when given the data self.imputer = imputer - # Parameter checking if self.agg_meta_for is not None and not set(self.agg_meta_for).issubset( set(self.aggfuncs), @@ -96,37 +90,14 @@ def __init__( raise ValueError( "Cannot compute meta for a column not being aggregated.", ) - if self.window_duration is not None: divided = self.window_duration / self.timestep_size if divided != int(divided): raise ValueError("Window duration be divisible by bucket size.") - def get_timestamp_col(self) -> str: - """Get timestamp column. - - Returns - ------- - str - Name of timestamp column. - - """ - return self.timestamp_col - - def get_aggfuncs(self) -> Dict[str, Callable]: - """Get aggregation functions. - - Returns - ------- - dict - Aggregation functions. - - """ - return self.aggfuncs - def _process_aggfuncs( self, - aggfuncs: Dict[str, Union[str, Callable]], + aggfuncs: Dict[str, Union[str, Callable[[pd.Series], Any]]], ) -> Dict[str, Any]: """Process aggregation functions for respective columns. @@ -146,7 +117,7 @@ def _process_aggfuncs( f"""Aggfunc string {aggfunc} not supported. Supporting: {','.join(AGGFUNCS)}""", ) - aggfuncs[col] = AGGFUNCS[aggfunc] + aggfuncs[col] = AGGFUNCS[aggfunc] # type: ignore elif callable(aggfunc): pass else: @@ -187,11 +158,9 @@ def _restrict_by_timestamp(self, data: pd.DataFrame) -> pd.DataFrame: """ data = data.merge(self.window_times, on=self.time_by, how="left") - - cond = (data[self.timestamp_col] >= data[START_TIMESTAMP]) & ( - data[self.timestamp_col] < data[STOP_TIMESTAMP] + cond = (data[self.timestamp_col] >= data[WINDOW_START_TIMESTAMP]) & ( + data[self.timestamp_col] < data[WINDOW_STOP_TIMESTAMP] ) - # Keep if no match was made (i.e., no restriction performed) cond = cond | (data[self.timestamp_col].isnull()) return data[cond] @@ -200,7 +169,7 @@ def _use_provided_window( self, window_time: pd.DataFrame, default_time: pd.DataFrame, - warning_args: Tuple, + warning_args: Tuple[str, str], ) -> pd.DataFrame: """Process a window start/stop time. @@ -228,7 +197,6 @@ def _use_provided_window( ), *warning_args, ) - # Default non-existent to earliest time. window_time = default_time.join(window_time) inds = window_time[RESTRICT_TIMESTAMP].isna() @@ -236,6 +204,7 @@ def _use_provided_window( inds ] window_time = window_time.drop(self.timestamp_col, axis=1) + return window_time def _compute_window_start( @@ -264,7 +233,6 @@ def _compute_window_start( .groupby(self.time_by, sort=False) .agg({self.timestamp_col: "min"}) ) - if window_start_time is None: # Use earliest times earliest_time = earliest_time.rename( @@ -281,6 +249,7 @@ def _compute_window_start( ) self._check_start_stop_window_ts(window_start_time) + return window_start_time def _compute_window_stop( @@ -312,14 +281,12 @@ def _compute_window_stop( raise ValueError( "Cannot provide window_stop_time if window_duration was set.", ) - if self.window_duration is not None: # Use window duration to compute the stop times for each group window_stop_time = window_start_time.copy() window_stop_time[RESTRICT_TIMESTAMP] += pd.Timedelta( hours=self.window_duration, ) - else: # Take the latest timestamp for each time_by group latest_time = ( @@ -327,7 +294,6 @@ def _compute_window_stop( .groupby(self.time_by, sort=False) .agg({self.timestamp_col: "max"}) ) - if window_stop_time is None: # Use latest times latest_time = latest_time.rename( @@ -342,8 +308,8 @@ def _compute_window_stop( latest_time, ("stop", "latest"), ) - self._check_start_stop_window_ts(window_stop_time) + return window_stop_time def _compute_window_times( @@ -351,7 +317,7 @@ def _compute_window_times( data: pd.DataFrame, window_start_time: Optional[pd.DataFrame] = None, window_stop_time: Optional[pd.DataFrame] = None, - ): + ) -> pd.DataFrame: """Compute the start/stop timestamps for each time_by window. Parameters @@ -374,23 +340,22 @@ def _compute_window_times( data, window_start_time=window_start_time, ) - # Compute window stop time window_stop_time = self._compute_window_stop( data, window_start_time, window_stop_time=window_stop_time, ) - # Combine and compute additional information window_start_time = window_start_time.rename( - {RESTRICT_TIMESTAMP: START_TIMESTAMP}, + {RESTRICT_TIMESTAMP: WINDOW_START_TIMESTAMP}, axis=1, ) window_stop_time = window_stop_time.rename( - {RESTRICT_TIMESTAMP: STOP_TIMESTAMP}, + {RESTRICT_TIMESTAMP: WINDOW_STOP_TIMESTAMP}, axis=1, ) + return window_start_time.join(window_stop_time) def _compute_timestep(self, group: pd.DataFrame) -> pd.DataFrame: @@ -408,7 +373,7 @@ def _compute_timestep(self, group: pd.DataFrame) -> pd.DataFrame: """ loc = tuple(group[self.time_by].values[0]) - start = self.window_times.loc[loc][START_TIMESTAMP] + start = self.window_times.loc[loc][WINDOW_START_TIMESTAMP] group[TIMESTEP] = (group[self.timestamp_col] - start) / pd.Timedelta( hours=self.timestep_size, ) @@ -438,7 +403,6 @@ def _compute_agg_meta(self, group: pd.DataFrame) -> pd.DataFrame: }, dropna=False, ) - keep = [] for col in self.agg_meta_for: # type: ignore meta[col + "_count"] = meta[(col, "len")] @@ -446,9 +410,9 @@ def _compute_agg_meta(self, group: pd.DataFrame) -> pd.DataFrame: meta[(col, "")] / meta[(col, "len")] ) keep.extend([col + "_count", col + "_null_fraction"]) - meta = meta[keep] meta.columns = meta.columns.droplevel(1) + return meta def _compute_aggregation(self, group: pd.DataFrame) -> pd.DataFrame: @@ -466,7 +430,6 @@ def _compute_aggregation(self, group: pd.DataFrame) -> pd.DataFrame: """ group = group.groupby(TIMESTEP, sort=False, dropna=False) - # Compute aggregation meta if self.agg_meta_for is not None: agg_meta = self._compute_agg_meta(group) @@ -475,11 +438,8 @@ def _compute_aggregation(self, group: pd.DataFrame) -> pd.DataFrame: if self.imputer is not None and self.imputer.intra is not None: group = self.imputer.intra(group) - AggregatedImputer(group) - group = group.agg(self.aggfuncs) - # Include aggregation meta if agg_meta is not None: group = group.join(agg_meta) @@ -497,14 +457,12 @@ def _aggregate( sort=False, group_keys=False, ).apply(self._compute_timestep) - # Aggregate has_inter_imputer = True if self.imputer is None: has_inter_imputer = False elif self.imputer.inter is None: has_inter_imputer = False - if self.agg_meta_for is None and not has_inter_imputer: # EFFICIENT - Can perform if no imputation or metadata calculation is done grouped = data_with_timesteps.groupby(self.agg_by + [TIMESTEP], sort=False) @@ -513,20 +471,20 @@ def _aggregate( # INEFFICIENT - Perform with a custom function to allow addded functionality grouped = data_with_timesteps.groupby(self.agg_by, sort=False) aggregated = grouped.apply(self._compute_aggregation) - if not include_timestep_start: return aggregated - # Get the start timestamp for each timestep aggregated = aggregated.reset_index().set_index(self.time_by) - - aggregated = aggregated.join(self.window_times[START_TIMESTAMP]) - aggregated[START_TIMESTEP] = aggregated[START_TIMESTAMP] + pd.to_timedelta( + aggregated = aggregated.join(self.window_times[WINDOW_START_TIMESTAMP]) + aggregated[START_TIMESTEP] = aggregated[ + WINDOW_START_TIMESTAMP + ] + pd.to_timedelta( aggregated[TIMESTEP] * self.timestep_size, unit="h", ) - aggregated = aggregated.drop(START_TIMESTAMP, axis=1) + aggregated = aggregated.drop(WINDOW_START_TIMESTAMP, axis=1) aggregated = aggregated.reset_index() + return aggregated.set_index(self.agg_by + [TIMESTEP]) @time_function @@ -536,7 +494,7 @@ def __call__( window_start_time: Optional[pd.DataFrame] = None, window_stop_time: Optional[pd.DataFrame] = None, include_timestep_start: bool = True, - ) -> Tuple[pd.DataFrame, pd.DataFrame]: + ) -> pd.DataFrame: """Aggregate. The window start and stop times can be used to cut short the timeseries. @@ -569,34 +527,29 @@ def __call__( # Parameter checking if not isinstance(data, pd.DataFrame): raise ValueError("Data to aggregate must be a pandas.DataFrame.") - has_columns( data, list(set([self.timestamp_col] + self.time_by + self.agg_by)), raise_error=True, ) - if has_columns(data, TIMESTEP): - raise ValueError(f"Inputted data cannot have a column called {TIMESTEP}.") - + raise ValueError(f"Input data cannot have a column called {TIMESTEP}.") # Ensure the timestamp column is a timestamp. Drop null times (NaT). is_timestamp_series(data[self.timestamp_col], raise_error=True) data = dropna_rows(data, self.timestamp_col) - # Compute start/stop timestamps self.window_times = self._compute_window_times( data, window_start_time=window_start_time, window_stop_time=window_stop_time, ) - # Restrict the data according to the start/stop data = self._restrict_by_timestamp(data) return self._aggregate(data, include_timestep_start=include_timestep_start) @time_function - def vectorize(self, aggregated: pd.DataFrame) -> np.ndarray: + def vectorize(self, aggregated: pd.DataFrame) -> Vectorized: """Vectorize aggregated data. Parameters @@ -615,9 +568,7 @@ def vectorize(self, aggregated: pd.DataFrame) -> np.ndarray: raise NotImplementedError( "Cannot currently vectorize data aggregated with no window duration.", ) - num_timesteps = int(self.window_duration / self.timestep_size) - # Parameter checking has_columns(aggregated, list(self.aggfuncs.keys()), raise_error=True) if not aggregated.index.names == self.agg_by + [TIMESTEP]: @@ -632,14 +583,12 @@ def vectorize(self, aggregated: pd.DataFrame) -> np.ndarray: names=index, ) vectorized = aggregated.reindex(idx) - # Calculate new shape and indexes shape = [ len(vectorized.index.levels[i]) for i in range(len(vectorized.index.levels)) ] indexes = [list(self.aggfuncs.keys())] indexes.extend([ind.values for ind in vectorized.index.levels]) - # Reshape and vectorize vectorized = np.stack( [vectorized[aggfunc].values.reshape(shape) for aggfunc in self.aggfuncs], @@ -656,9 +605,9 @@ def aggregate_values( data: pd.DataFrame, window_start_time: Optional[pd.DataFrame] = None, window_stop_time: Optional[pd.DataFrame] = None, - start_bound_func: Optional[Callable] = None, - stop_bound_func: Optional[Callable] = None, - ): + start_bound_func: Optional[Callable[[pd.Series], pd.Series]] = None, + stop_bound_func: Optional[Callable[[pd.Series], pd.Series]] = None, + ) -> pd.DataFrame: """Aggregate temporal values. The temporal values are restricted by start/stop and then aggregated. @@ -675,7 +624,7 @@ def aggregate_values( window_duration was set. start_bound_func : Optional[Callable[[pd.Series], pd.Series]], optional A function to bound the start timestamp values, by default None - stop_bound_func : Optional[Callable], optional + stop_bound_func : Optional[Callable[[pd.Series], pd.Series]], optional A function to bound the start timestamp values, by default None Returns @@ -697,23 +646,21 @@ def aggregate_values( window_start_time=window_start_time, window_stop_time=window_stop_time, ) - # Restrict the data according to the start/stop data = self._restrict_by_timestamp(data) - # Filter the data based on bounds on start/stop data = start_bound_func(data) if start_bound_func else data data = stop_bound_func(data) if stop_bound_func else data - grouped = data.groupby(self.agg_by, sort=False) + return grouped.agg(self.aggfuncs) def tabular_as_aggregated( tab: pd.DataFrame, index: str, - var_name: str = EVENT_NAME, - value_name: str = EVENT_VALUE, + var_name: str, + value_name: str, strategy: str = ALL, num_timesteps: Optional[int] = None, sort: bool = True, @@ -726,9 +673,9 @@ def tabular_as_aggregated( Tabular data. index: str Index column name. - var_name: str, optional + var_name: str The name of the resultant column containing the original tabular column names. - value_name: str, optional + value_name: str The name of the resultant column containing the tabular values. strategy: str Strategy to fake aggregation. E.g., FIRST sets a first timestep to the value, @@ -749,33 +696,28 @@ def tabular_as_aggregated( raise ValueError( f"Strategy not recognized. Must be in: {', '.join(supported)}.", ) - if num_timesteps is None and strategy in [LAST, ALL]: raise ValueError("Must specify num_timesteps for this strategy.") - tab = tab.set_index(index) tab = tab.melt(var_name=var_name, value_name=value_name, ignore_index=False) tab = tab.reset_index() - # Set value in the first timestep if strategy == FIRST: tab[TIMESTEP] = 0 - # Set value in the last timestep elif strategy == LAST: assert num_timesteps is not None tab[TIMESTEP] = num_timesteps - 1 - # Repeat value across all timesteps elif strategy == ALL: assert num_timesteps is not None tab = pd.concat( [t.assign(**{TIMESTEP: i}) for i, t in enumerate([tab] * num_timesteps)], ) - tab = tab.set_index([index, var_name, TIMESTEP]) if sort: return tab.sort_index() + return tab @@ -784,7 +726,7 @@ def timestamp_ffill_agg( num_timesteps: int, val: float = 1, fill_nan: Optional[float] = None, -): +) -> np.typing.ArrayLike: """Perform single-value aggregation with fill forward functionality given timesteps. If a timestep is negative, it is treated as occuring before the regular window and @@ -815,24 +757,18 @@ def timestamp_ffill_agg( shape = (len(timesteps), num_timesteps) arr = np.empty(shape) arr[:, :] = np.NaN - before = (timesteps < 0).values after = (timesteps >= 0).values - # Predict 1 from beginning to end arr[before] = val - # Predict 1 in a specific timestep rows = np.where(after)[0] cols = timesteps[after].values.astype(int) - before_end = cols < num_timesteps rows = rows[before_end] cols = cols[before_end] arr[rows, cols] = val - arr = numpy_2d_ffill(arr) - if fill_nan is not None: arr = np.nan_to_num(arr, nan=fill_nan) diff --git a/cyclops/process/clean.py b/cyclops/process/clean.py index 2450c4f3a..4a648ea7d 100644 --- a/cyclops/process/clean.py +++ b/cyclops/process/clean.py @@ -5,16 +5,7 @@ import pandas as pd -from cyclops.process.column_names import ( - ENCOUNTER_ID, - EVENT_CATEGORY, - EVENT_NAME, - EVENT_TIMESTAMP, - EVENT_VALUE, - EVENT_VALUE_UNIT, -) from cyclops.process.constants import ( - EMPTY_STRING, NEGATIVE_RESULT_TERMS, POSITIVE_RESULT_TERMS, ) @@ -27,7 +18,7 @@ strip_whitespace, to_lower, ) -from cyclops.process.util import assert_has_columns, gather_columns, log_counts_step +from cyclops.process.util import log_df_counts from cyclops.utils.log import setup_logging from cyclops.utils.profile import time_function @@ -37,152 +28,10 @@ setup_logging(print_level="INFO", logger=LOGGER) -UNSUPPORTED = [ - "urinalysis", - "alp", - "alt", - "ast", - "d-dimer", - "ldh", - "serum osmolality", - "tsh", - "urine osmolality", -] - - -def combine_events(event_data: Union[pd.DataFrame, List[pd.DataFrame]]) -> pd.DataFrame: - """Gather event data from multiple dataframes into a single one. - - Events can be in multiple raw dataframes like labs, vitals, etc. This - function takes in multiple dataframes and gathers all events into a single - dataframe. If just a single dataframe is passed, it returns it back. - - Parameters - ---------- - event_data: pandas.DataFrame or list of pandas.DataFrame - Raw event data. - - Returns - ------- - pandas.DataFrame - Combined event data. - - """ - - def add_events(events: pd.DataFrame, events_to_add: pd.DataFrame) -> pd.DataFrame: - return pd.concat( - [ - events, - events_to_add, - ], - ignore_index=True, - axis=0, - ) - - events = pd.DataFrame() - if isinstance(event_data, pd.DataFrame): - event_data = [event_data] - for event_dataframe in event_data: - events = add_events(events, event_dataframe) - - return events - - -@assert_has_columns([ENCOUNTER_ID]) -def convert_to_events( - data: pd.DataFrame, - event_name: str, - event_category: str, - timestamp_col: str, - value_col: Optional[str] = None, -) -> pd.DataFrame: - """Convert dataframe with just timestamps into events. - - Any event like admission, discharge, transfer, etc. can be converted to the - common events dataframe format with 'encounter_id', 'event_name', 'event_timestamp', - and 'event_value' columns. The input data in this case does not have an explicit - event name and hence we assign it. Like for e.g. admission. - - Parameters - ---------- - data: pandas.DataFrame - Raw data with some timestamps denoting an event. - event_name: str - Event name to give, added as a new column. - event_category: str - Event name to give, added as a new column. - timestamp_col: str - Name of the column in the incoming dataframe that has the timestamp. - value_col: str, optional - Name of the column in the incoming dataframe that has potential event values. - - Returns - ------- - pandas.DataFrame - Events in the converted format. - - """ - if value_col: - cols = [ENCOUNTER_ID, timestamp_col, value_col] - else: - cols = [ENCOUNTER_ID, timestamp_col] - events = gather_columns(data, cols) - if EVENT_VALUE not in events: - events[EVENT_VALUE] = EMPTY_STRING - events = events.rename( - columns={timestamp_col: EVENT_TIMESTAMP, value_col: EVENT_VALUE}, - ) - events[EVENT_NAME] = event_name - events[EVENT_CATEGORY] = event_category - - return events - - -def is_supported(event_name: str) -> bool: - """Check if event name is supported. - - Processing events involves data cleaning, and hence some event names - are currently removed until they are supported. - - Parameters - ---------- - event_name: str - Name of event. - - Returns - ------- - bool - If supported return True, else False. - - """ - return event_name not in UNSUPPORTED - - -@assert_has_columns([EVENT_NAME]) -def drop_unsupported(data: pd.DataFrame) -> pd.DataFrame: - """Drop events currently not supported for processing. - - Parameters - ---------- - data: pandas.DataFrame - Input data. - - Returns - ------- - pandas.DataFrame - Output data with dropped events (rows) which had unsupported events. - - """ - data = data.loc[data[EVENT_NAME].apply(is_supported)] - log_counts_step(data, "Drop unsupported events...", columns=True) - - return data - - def normalize_names(names: pd.Series) -> pd.Series: - """Normalize event names. + """Normalize column names such that they can be used as features. - Perform basic cleaning/house-keeping of event names. + Perform basic cleaning/house-keeping of column names. e.g. remove parantheses from the measurement-name, convert to lower-case. @@ -198,35 +47,13 @@ def normalize_names(names: pd.Series) -> pd.Series: """ names = names.apply(to_lower) - names = names.apply(remove_text_in_parentheses) return names.str.strip() -def normalize_categories(categories: pd.Series) -> pd.Series: - """Normalize event category names. - - Perform basic cleaning/house-keeping of event category names. - e.g. convert to lower-case. - - Parameters - ---------- - categories: pandas.Series - Categories. - - Returns - ------- - pandas.Series - Normalized event categories. - - """ - categories = categories.apply(to_lower) - return categories.str.strip() - - def normalize_values(values: pd.Series) -> pd.Series: - """Normalize event values. + """Normalize value columns such that they can be used as features. - Perform basic cleaning/house-keeping of event values. + Perform basic cleaning/house-keeping of column values. e.g. fill empty strings with NaNs, convert some strings to equivalent boolean or numeric, so they can be used as features. @@ -260,7 +87,6 @@ def normalize_units(units: pd.Series) -> pd.Series: """Normalize event value units. Perform basic cleaning/house-keeping of event value units. - e.g. converting units to SI. Parameters ---------- @@ -280,7 +106,12 @@ def normalize_units(units: pd.Series) -> pd.Series: @time_function -def normalize_events(data) -> pd.DataFrame: +def normalize_events( + data: pd.DataFrame, + event_name_col: str, + event_value_col: Optional[str] = None, + event_value_unit_col: Optional[str] = None, +) -> pd.DataFrame: """Pre-process and normalize (clean) raw event data. Parameters @@ -296,20 +127,16 @@ def normalize_events(data) -> pd.DataFrame: """ data = data.copy() - log_counts_step(data, "Cleaning raw event data...", columns=True) + log_df_counts(data, event_name_col, "Cleaning raw event data...", columns=True) data = data.infer_objects() - data[EVENT_NAME] = normalize_names(data[EVENT_NAME]) - data = drop_unsupported(data) - - if data[EVENT_VALUE].dtypes == object: - data[EVENT_VALUE] = normalize_values(data[EVENT_VALUE]) - log_counts_step(data, "Normalized values...", columns=True) + data[event_name_col] = normalize_names(data[event_name_col]) - if EVENT_VALUE_UNIT in list(data.columns): - data[EVENT_VALUE_UNIT] = normalize_units(data[EVENT_VALUE_UNIT]) + if event_value_col and data[event_value_col].dtypes == object: + data[event_value_col] = normalize_values(data[event_value_col]) + log_df_counts(data, event_name_col, "Normalized values...", columns=True) - if EVENT_CATEGORY in list(data.columns): - data[EVENT_CATEGORY] = normalize_categories(data[EVENT_CATEGORY]) + if event_value_unit_col and event_value_unit_col in list(data.columns): + data[event_value_unit_col] = normalize_units(data[event_value_unit_col]) return data diff --git a/cyclops/process/column_names.py b/cyclops/process/column_names.py deleted file mode 100644 index b6ab73477..000000000 --- a/cyclops/process/column_names.py +++ /dev/null @@ -1,49 +0,0 @@ -"""Column names to use across datasets, used for processing.""" - -ENCOUNTER_ID = "encounter_id" -SUBJECT_ID = "subject_id" -HOSPITAL_ID = "hospital_id" -ADMIT_TIMESTAMP = "admit_timestamp" -DISCHARGE_TIMESTAMP = "discharge_timestamp" -DISCHARGE_DISPOSITION = "discharge_disposition" -CARE_UNIT = "care_unit" -READMISSION = "readmission" -TRIAGE_LEVEL = "triage_level" -SCU_ADMIT_TIMESTAMP = "scu_admit_timestamp" -SCU_DISCHARGE_TIMESTAMP = "scu_discharge_timestamp" - -AGE = "age" -SEX = "sex" -CITY = "city" -PROVINCE = "province" -COUNTRY = "country" -LANGUAGE = "language" -DATE_OF_DEATH = "dod" -TOTAL_COST = "total_cost" -YEAR = "year" - -DIAGNOSIS_CODE = "diagnosis_code" -DIAGNOSIS_TITLE = "diagnosis_title" -DIAGNOSIS_VERSION = "diagnosis_version" -DIAGNOSIS_TYPE = "diagnosis_type" -DIAGNOSIS_TRAJECTORY = "diagnosis_trajectory" - -ER_ADMIT_TIMESTAMP = "er_admit_timestamp" -ER_DISCHARGE_TIMESTAMP = "er_discharge_timestamp" -LENGTH_OF_STAY_IN_ER = "length_of_stay_in_er" -MORTALITY_IN_HOSPITAL = "mortality_in_hospital" - -EVENT_NAME = "event_name" -EVENT_VALUE = "event_value" -EVENT_VALUE_UNIT = "event_value_unit" -EVENT_TIMESTAMP = "event_timestamp" -EVENT_CATEGORY = "event_category" - -# Aggregation. -START_TIMESTAMP = "start" -STOP_TIMESTAMP = "stop" -START_TIMESTEP = "timestep_start" -RESTRICT_TIMESTAMP = "restrict_timestamp" - -REFERENCE_RANGE = "reference_range" -TIMESTEP = "timestep" diff --git a/cyclops/process/constants.py b/cyclops/process/constants.py index 0c3ac549a..4cc178a6c 100644 --- a/cyclops/process/constants.py +++ b/cyclops/process/constants.py @@ -71,28 +71,9 @@ LAST = "last" ALL = "all" -# GEMINI Admin. -THPM = "THPM" -SBK = "SBK" -UHNTG = "UHNTG" -SMH = "SMH" -UHNTW = "UHNTW" -THPC = "THPC" -PMH = "PMH" -MSH = "MSH" -HOSPITAL_ID_MAP = { - THPM: 0, - SBK: 1, - UHNTG: 2, - SMH: 3, - UHNTW: 4, - THPC: 5, - PMH: 6, - MSH: 7, -} -MORTALITY_DISCHARGE_DISPOSITION_CODES = [7, 72, 73, 74] # Diagnostic codes (ICD-10). +DIAGNOSIS_TRAJECTORY = "diagnosis_trajectory" TRAJECTORIES = { "Certain infectious and parasitic diseases": ("A00", "B99"), "Neoplasms": ("C00", "D49"), @@ -132,58 +113,6 @@ "Z99", ), } - -# GEMINI labs. -DRUG_SCREEN = [ - "amitriptyline", - "amphetamine", - "barbiturates", - "barbiturates_scn", - "barbiturates_and_sedatives_blood", - "benzodiazepine_scn", - "benzodiazepines_screen", - "cannabinoids", - "clozapine", - "cocaine", - "cocaine_metabolite", - "codeine", - "cocaine_metabolite", - "codeine_metabolite_urine", - "desipramine", - "dextromethorphan", - "dim_per_dip_metabolite", - "dimen_per_diphenhydramine", - "doxepin", - "ephedrine_per_pseudo", - "fluoxetine", - "hydrocodone", - "hydromorphone", - "imipramine", - "lidocaine", - "mda_urine", - "mdma_ecstacy", - "methadone", - "meperidine_urine", - "methadone_metabolite_urine", - "methamphetamine", - "morphine", - "morphine_metabolite_urine", - "nortriptyline", - "olanzapine_metabolite_u", - "olanzapine_urine", - "opiates_urine", - "oxycodone", - "oxycodone_cobas", - "oxycodone_metabolite", - "phenylpropanolamine", - "propoxyphene", - "sertraline", - "trazodone", - "trazodone_metabolite", - "tricyclics_scn", - "venlafaxine", - "venlafaxine_metabolite", -] EMPTY_STRING = "" UNDERSCORE = "_" NEGATIVE_RESULT_TERMS = [ diff --git a/cyclops/process/diagnoses.py b/cyclops/process/diagnoses.py index 1c301cc9e..8b53e5cf0 100644 --- a/cyclops/process/diagnoses.py +++ b/cyclops/process/diagnoses.py @@ -2,12 +2,11 @@ import logging import re -from typing import Dict, Optional +from typing import Dict, Optional, Tuple import pandas as pd -from cyclops.process.column_names import DIAGNOSIS_TRAJECTORY -from cyclops.process.constants import EMPTY_STRING, TRAJECTORIES +from cyclops.process.constants import DIAGNOSIS_TRAJECTORY, EMPTY_STRING, TRAJECTORIES from cyclops.utils.log import setup_logging @@ -81,14 +80,16 @@ def get_numeric(code: str) -> str: return re.sub("[^0-9]", EMPTY_STRING, code) -def get_icd_category(code: str, trajectories: dict, raise_err: bool = False) -> str: +def get_icd_category( + code: str, trajectories: Dict[str, Tuple[str, str]], raise_err: bool = False +) -> str: """Get ICD10 category. Parameters ---------- code: str Input diagnosis code. - trajectories: dict + trajectories: Dict[str, Tuple[str, str]] Dictionary mapping of ICD10 trajectories. raise_err: Flag to raise error if code cannot be converted (for debugging.) @@ -121,7 +122,7 @@ def get_icd_category(code: str, trajectories: dict, raise_err: bool = False) -> def process_diagnoses( series: pd.Series, - trajectories: Optional[Dict] = None, + trajectories: Optional[Dict[str, Tuple[str, str]]] = None, ) -> pd.Series: """Process diagnoses data (codes) into trajectories. @@ -129,7 +130,7 @@ def process_diagnoses( ---------- series: pd.Series Diagnosis code data. - trajectories: dict, optional + trajectories: Dict[str, Tuple[str, str]], optional Mapping from code to trajectory. Returns diff --git a/cyclops/process/feature/feature.py b/cyclops/process/feature/feature.py index 6ae3bb71b..db87dc223 100644 --- a/cyclops/process/feature/feature.py +++ b/cyclops/process/feature/feature.py @@ -37,9 +37,6 @@ setup_logging(print_level="INFO", logger=LOGGER) -# mypy: ignore-errors - - class FeatureMeta: """Feature metadata class. @@ -54,7 +51,7 @@ class FeatureMeta: """ - def __init__(self, **kwargs) -> None: + def __init__(self, **kwargs: Any) -> None: """Init.""" # Feature type checking if FEATURE_TYPE_ATTR not in kwargs: @@ -89,7 +86,10 @@ def get_type(self) -> str: Feature type. """ - return getattr(self, FEATURE_TYPE_ATTR) + feature_type = getattr(self, FEATURE_TYPE_ATTR) + assert isinstance(feature_type, str) + + return feature_type def is_target(self) -> bool: """Get whether the feature is a target. @@ -100,9 +100,12 @@ def is_target(self) -> bool: Whether the feature is a target. """ - return getattr(self, FEATURE_TARGET_ATTR) + is_target = getattr(self, FEATURE_TARGET_ATTR) + assert isinstance(is_target, bool) - def indicator_of(self) -> Optional[str]: + return is_target + + def indicator_of(self) -> Union[str, None]: """Get the name of an indicator's original categorical feature. Returns @@ -112,9 +115,12 @@ def indicator_of(self) -> Optional[str]: not a categorical indicator. """ - return getattr(self, FEATURE_INDICATOR_ATTR) + indicator_of = getattr(self, FEATURE_INDICATOR_ATTR) + assert isinstance(indicator_of, (str, type(None))) + + return indicator_of - def get_mapping(self) -> Optional[dict]: + def get_mapping(self) -> Union[Dict[int, Any], None]: """Get the category value map for binary and ordinal categories. Returns @@ -124,9 +130,12 @@ def get_mapping(self) -> Optional[dict]: there is no mapping. """ - return getattr(self, FEATURE_MAPPING_ATTR) + mapping = getattr(self, FEATURE_MAPPING_ATTR) + assert isinstance(mapping, (dict, type(None))) + + return mapping - def update(self, meta: List[tuple]) -> None: + def update(self, meta: List[Tuple[str, Any]]) -> None: """Update meta attributes. Parameters @@ -168,49 +177,37 @@ def __init__( features: Union[str, List[str]], by: Optional[Union[str, List[str]]] = None, targets: Optional[Union[str, List[str], None]] = None, - force_types: Optional[dict] = None, + force_types: Optional[Dict[str, str]] = None, normalizers: Optional[Dict[str, GroupbyNormalizer]] = None, ): """Init.""" # Check data if not isinstance(data, pd.DataFrame): raise ValueError("Feature data must be a pandas.DataFrame.") - if not has_range_index(data): raise ValueError( "Data required to have a range index. Try resetting the index.", ) - # Force a range index data = to_range_index(data) - feature_list = to_list(features) - target_list = to_list_optional(targets, none_to_empty=True) - + target_list = [] if targets is None else to_list(targets) if len(feature_list) == 0: raise ValueError("Must specify at least one feature.") - has_columns(data, feature_list, raise_error=True) has_columns(data, target_list, raise_error=True) - - self.by = to_list_optional(by) - - if self.by is not None: - has_columns(data, self.by, raise_error=True) - - if len(set(self.by).intersection(set(feature_list))) != 0: + self.by_ = [] if by is None else to_list(by) + if self.by_: + has_columns(data, self.by_, raise_error=True) + if len(set(self.by_).intersection(set(feature_list))) != 0: raise ValueError("Columns in 'by' cannot be considered features.") - # Add targets to the list of features if they were not included self.features = list(set(feature_list + target_list)) - # Type checking and inference data = normalize_data(data, self.features) - self.data = data self.meta: Dict[str, FeatureMeta] = {} self._infer_feature_types(force_types=force_types) - self.normalizers: Dict[str, GroupbyNormalizer] = {} self.normalized: Dict[str, bool] = {} if normalizers is not None: @@ -238,24 +235,21 @@ def get_data( """ data = self.data - # Take only the feature columns - if features_only: - data = data[self.features + to_list_optional(self.by, none_to_empty=True)] - + if self.by_ and features_only: + data = data[self.by_ + self.features] # Convert to binary categorical indicators if to_binary_indicators is not None: data = self._ordinal_to_indicators(data, to_list(to_binary_indicators)) - # Convert binary columns from boolean to integer binary_cols = [col for col, value in self.types.items() if value == BINARY] for col in binary_cols: data[col] = data[col].astype(int) - return data.set_index(self.by) + return data.set_index(self.by_) @property - def columns(self) -> List[str]: + def columns(self) -> pd.Index: """Access as attribute, data columns. Returns @@ -301,7 +295,7 @@ def feature_names( return features @property - def types(self) -> dict: + def types(self) -> Dict[str, str]: """Access as attribute, feature type names. Note: These are framework-specific feature names. @@ -342,7 +336,9 @@ def features_by_type(self, type_: str) -> List[str]: """ return [name for name, ftype in self.types.items() if ftype == type_] - def split_by_values(self, value_splits: List[np.ndarray]) -> Tuple: + def split_by_values( + self, value_splits: List[np.typing.NDArray[Any]] + ) -> Tuple["Features", ...]: """Split the data into multiple datasets by values. Parameters @@ -357,21 +353,17 @@ def split_by_values(self, value_splits: List[np.ndarray]) -> Tuple: A tuple of Features objects with the split data. """ - on_col = self.by[0] + on_col = self.by_[0] unique = self.data[on_col].unique() unique.sort() - all_vals = np.concatenate(value_splits) all_vals.sort() - if not np.array_equal(unique, all_vals): raise ValueError("Invalid split values.") - datas = [] - for split in value_splits: + for value_split in value_splits: data_copy = self.data.copy() - datas.append(data_copy[data_copy[on_col].isin(split)]) - + datas.append(data_copy[data_copy[on_col].isin(value_split)]) save_data = self.data self.data = None splits = [copy.deepcopy(self) for _ in range(len(value_splits))] @@ -386,7 +378,7 @@ def split( fractions: Union[float, List[float]] = 1.0, randomize: bool = True, seed: Optional[int] = None, - ): + ) -> Tuple["Features", ...]: """Split the data into multiple datasets by fractions. Parameters @@ -412,7 +404,7 @@ def compute_value_splits( fractions: Union[float, List[float]] = 1.0, randomize: bool = True, seed: Optional[int] = None, - ) -> List[np.ndarray]: + ) -> List[np.typing.NDArray[Any]]: """Compute the value splits given fractions. Parameters @@ -431,7 +423,7 @@ def compute_value_splits( with values determined how the splits are segmented. """ - on_col = self.by[0] + on_col = self.by_[0] unique = self.data[on_col].unique() unique.sort() idx_splits = list( @@ -439,7 +431,7 @@ def compute_value_splits( ) return [np.take(unique, split) for split in idx_splits] - def _update_meta(self, meta_update: dict) -> None: + def _update_meta(self, meta_update: Dict[str, Dict[str, Any]]) -> None: """Update feature metadata. Parameters @@ -458,7 +450,7 @@ def _update_meta(self, meta_update: dict) -> None: def _to_feature_types( self, data: pd.DataFrame, - new_types: dict, + new_types: Dict[str, str], inplace: bool = True, ) -> pd.DataFrame: """Convert feature types. @@ -481,7 +473,6 @@ def _to_feature_types( invalid = set(new_types.keys()) - set(self.features) if len(invalid) > 0: raise ValueError(f"Unrecognized features: {', '.join(invalid)}") - for col, new_type in new_types.items(): if col in self.meta: # Do not allow converting to categorical indicators inplace @@ -489,9 +480,7 @@ def _to_feature_types( raise ValueError( f"Cannot convert {col} to binary categorical indicators.", ) - data, meta = to_types(data, new_types) - if inplace: # Append any new indicator features for col, fmeta in meta.items(): @@ -505,8 +494,8 @@ def _to_feature_types( def _infer_feature_types( self, - force_types: Optional[dict] = None, - ): + force_types: Optional[Dict[str, str]] = None, + ) -> None: """Infer feature types. Can optionally force certain types on specified features. @@ -517,12 +506,12 @@ def _infer_feature_types( A map from the feature name to the forced feature type. """ + if force_types is None: + force_types = {} infer_features = to_list( - set(self.features) - set(to_list_optional(force_types, none_to_empty=True)), + set(self.features) - set(to_list(force_types)), ) - new_types = infer_types(self.data, infer_features) - # Force certain features to be specific types if force_types is not None: for feature, type_ in force_types.items(): @@ -549,14 +538,11 @@ def add_normalizer( raise ValueError( "A normalizer with this key already exists. Consider first removing it.", ) - by = normalizer.get_by() if by is not None: has_columns(self.data, by, raise_error=True) - normalizer_map = normalizer.get_map() features = set(normalizer_map.keys()) - # Check to make sure none of the feature exist in another normalizer for norm_key, norm in self.normalizers.items(): norm_set = set(norm.normalizer_map.keys()) @@ -565,21 +551,18 @@ def add_normalizer( raise ValueError( f"Features {', '.join(intersect)} exist in normalizer {norm_key}.", ) - # Check for non-existent columns in the map nonexistent = set(normalizer_map.keys()) - set(self.features) if len(nonexistent) > 0: raise ValueError( f"The following columns are not features: {', '.join(nonexistent)}.", ) - # Check for invalid non-numeric columns is_numeric = [self.meta[col].get_type() == NUMERIC for col in normalizer_map] if not all(is_numeric): raise ValueError( "Only numeric features may be normalized. Confirm feature choice/type.", ) - gbn = GroupbyNormalizer(normalizer_map, by) gbn.fit(self.data) self.normalizers[key] = gbn @@ -721,7 +704,7 @@ def slice( slice_map: Optional[Dict[str, Union[Any, List[Any]]]] = None, slice_query: Optional[str] = None, replace: bool = False, - ) -> np.ndarray: + ) -> Any: """Slice the data across column(s), given values. Parameters @@ -748,22 +731,20 @@ def slice( for slice_col, slice_vals in slice_map.items(): sliced_indices.append( self.data[self.data[slice_col].isin(to_list(slice_vals))][ - self.by[0] + self.by_[0] ].values, ) if sliced_indices: intersect_indices = set.intersection(*map(set, sliced_indices)) - sliced_data = self.data[self.data[self.by[0]].isin(intersect_indices)] + sliced_data = self.data[self.data[self.by_[0]].isin(intersect_indices)] else: sliced_data = self.data - if slice_query: sliced_data = sliced_data.query(slice_query) - if replace: self.data = sliced_data - return sliced_data[self.by[0]].values + return sliced_data[self.by_[0]].values class TabularFeatures(Features): @@ -775,7 +756,7 @@ def __init__( features: Union[str, List[str]], by: str, targets: Optional[Union[str, List[str]]] = None, - force_types: Optional[dict] = None, + force_types: Optional[Dict[str, str]] = None, ): """Init.""" if not isinstance(by, str): @@ -791,20 +772,18 @@ def __init__( force_types=force_types, ) - def vectorize(self, **get_data_kwargs) -> Vectorized: + def vectorize(self, **get_data_kwargs: Any) -> Vectorized: """Vectorize the tabular data. Parameters ---------- - **get_data_kwargs + **get_data_kwargs: Any Keyword arguments to be fed to get_data. Returns ------- - tuple - (data, by_map, feat_map), (pandas.DataFrame, dict, dict) - feat_map is the feature order and by_map is the order of - the by column, or None if no by was provided. + Vectorized + Vectorized data. """ if "features_only" in get_data_kwargs: @@ -816,13 +795,13 @@ def vectorize(self, **get_data_kwargs) -> Vectorized: data = self.get_data(**get_data_kwargs).reset_index() - by_map = list(data[self.by[0]].values) - data = data.drop(self.by, axis=1) + by_map = list(data[self.by_[0]].values) + data = data.drop(self.by_, axis=1) feat_map = list(data.columns) return Vectorized( data.values, indexes=[by_map, feat_map], - axis_names=[self.by[0], FEATURES], + axis_names=[self.by_[0], FEATURES], ) @@ -836,7 +815,7 @@ def __init__( by: Union[str, List[str]], timestamp_col: str, targets: Optional[Union[str, List[str]]] = None, - force_types: Optional[dict] = None, + force_types: Optional[Dict[str, str]] = None, aggregator: Optional[Aggregator] = None, ): """Init.""" @@ -850,20 +829,24 @@ def __init__( self.timestamp_col = timestamp_col self.aggregator = aggregator - self._check_aggregator() + if self.aggregator is not None: + self._check_aggregator() - def _check_aggregator(self): - if self.aggregator.get_timestamp_col() != self.timestamp_col: + def _check_aggregator(self) -> None: + """Check that the aggregator is valid.""" + if self.aggregator is None: + return + if self.aggregator.timestamp_col != self.timestamp_col: raise ValueError( "Features and aggregator timestamp columns must be the same.", ) - def aggregate(self, **aggregate_kwargs) -> pd.DataFrame: + def aggregate(self, **aggregate_kwargs: Any) -> pd.DataFrame: """Aggregate the data. Parameters ---------- - **aggregate_kwargs + **aggregate_kwargs: Any Keywords to pass to the aggregation function. Returns @@ -877,7 +860,7 @@ def aggregate(self, **aggregate_kwargs) -> pd.DataFrame: "Must pass an aggregator when creating features to aggregate.", ) - agg_features = self.aggregator.get_aggfuncs().keys() + agg_features = self.aggregator.aggfuncs.keys() # Check for non-existent columns in the map nonexistent = set(agg_features) - set(self.features) @@ -891,10 +874,10 @@ def aggregate(self, **aggregate_kwargs) -> pd.DataFrame: def split_features( features: List[Union[Features, TabularFeatures, TemporalFeatures]], - fractions: Optional[Union[float, List[float]]] = None, + fractions: Union[float, List[float]] = 1.0, randomize: bool = True, seed: Optional[int] = None, -) -> Tuple: +) -> Tuple[Tuple[Features, ...], ...]: """Split a set of features using the same uniquely identifying values. Parameters diff --git a/cyclops/process/feature/handle_types.py b/cyclops/process/feature/handle_types.py index 57d5de095..97e6a571c 100644 --- a/cyclops/process/feature/handle_types.py +++ b/cyclops/process/feature/handle_types.py @@ -27,9 +27,9 @@ def get_unique( - values: Union[np.ndarray, pd.Series], - unique: Optional[np.ndarray] = None, -) -> np.ndarray: + values: Union[np.typing.NDArray[Any], pd.Series], + unique: Optional[np.typing.NDArray[Any]] = None, +) -> np.typing.NDArray[Any]: """Get the unique values of pandas series. The utility of this function comes from checking whether the @@ -50,7 +50,7 @@ def get_unique( """ if unique is None: - return values.unique() + return values.unique() # type: ignore return unique @@ -145,7 +145,7 @@ def to_dtype(series: pd.Series, type_: str) -> pd.Series: def _valid_string( series: pd.Series, - unique: Optional[np.ndarray] = None, + unique: Optional[np.typing.NDArray[Any]] = None, raise_error: bool = False, ) -> bool: """Check whether a feature is a valid string type. @@ -175,7 +175,7 @@ def _valid_string( def _convertible_to_string( series: pd.Series, - unique: Optional[np.ndarray] = None, + unique: Optional[np.typing.NDArray[Any]] = None, raise_error: bool = False, ) -> bool: """Check whether a feature can be converted to type string. @@ -200,7 +200,7 @@ def _convertible_to_string( def _to_string( series: pd.Series, - unique: Optional[np.ndarray] = None, + unique: Optional[np.typing.NDArray[Any]] = None, ) -> Tuple[pd.Series, Dict[str, Any]]: """Convert type to string. @@ -224,7 +224,7 @@ def _to_string( def _valid_numeric( series: pd.Series, - unique: Optional[np.ndarray] = None, + unique: Optional[np.typing.NDArray[Any]] = None, raise_error: bool = False, ) -> bool: """Check whether a feature is a valid numeric type. @@ -255,7 +255,7 @@ def _valid_numeric( def _convertible_to_numeric( series: pd.Series, - unique: Optional[np.ndarray] = None, + unique: Optional[np.typing.NDArray[Any]] = None, raise_error: bool = False, ) -> bool: """Check whether a feature can be converted to type numeric. @@ -290,7 +290,7 @@ def _convertible_to_numeric( def _to_numeric( series: pd.Series, - unique: Optional[np.ndarray] = None, + unique: Optional[np.typing.NDArray[Any]] = None, ) -> Tuple[pd.Series, Dict[str, Any]]: """Convert type to numeric. @@ -317,7 +317,7 @@ def _convertible_to_categorical( series: pd.Series, category_min: Optional[int] = None, category_max: Optional[int] = None, - unique: Optional[np.ndarray] = None, + unique: Optional[np.typing.NDArray[Any]] = None, raise_error_over_max: bool = False, raise_error_under_min: bool = False, ) -> bool: @@ -376,7 +376,7 @@ def _convertible_to_categorical( def _valid_ordinal( series: pd.Series, - unique: Optional[np.ndarray] = None, + unique: Optional[np.typing.NDArray[Any]] = None, raise_error: bool = False, ) -> bool: """Check whether a feature is a valid ordinal type. @@ -411,7 +411,7 @@ def _valid_ordinal( def _convertible_to_ordinal( series: pd.Series, - unique: Optional[np.ndarray] = None, + unique: Optional[np.typing.NDArray[Any]] = None, category_max: int = 20, raise_error_over_max: bool = False, ) -> bool: @@ -445,7 +445,7 @@ def _convertible_to_ordinal( def _numeric_categorical_mapping( series: pd.Series, - unique: Optional[np.ndarray] = None, + unique: Optional[np.typing.NDArray[Any]] = None, ) -> Tuple[pd.Series, Dict[str, Any]]: """Map values to categories in a series. @@ -469,7 +469,7 @@ def _numeric_categorical_mapping( unique.sort() - map_dict: dict = {} + map_dict: Dict[Any, int] = {} for i, unique_val in enumerate(unique): map_dict[unique_val] = i @@ -483,7 +483,7 @@ def _numeric_categorical_mapping( def _to_ordinal( series: pd.Series, - unique: Optional[np.ndarray] = None, + unique: Optional[np.typing.NDArray[Any]] = None, ) -> Tuple[pd.Series, Dict[str, Any]]: """Convert type to ordinal. @@ -508,7 +508,7 @@ def _to_ordinal( def _valid_binary( series: pd.Series, - unique: Optional[np.ndarray] = None, + unique: Optional[np.typing.NDArray[Any]] = None, raise_error: bool = False, ) -> bool: """Check whether a feature is a valid binary type. @@ -543,7 +543,7 @@ def _valid_binary( def _convertible_to_binary( series: pd.Series, - unique: Optional[np.ndarray] = None, + unique: Optional[np.typing.NDArray[Any]] = None, ) -> bool: """Check whether a feature can be converted to type binary. @@ -573,7 +573,7 @@ def _convertible_to_binary( def _to_binary( series: pd.Series, - unique: Optional[np.ndarray] = None, + unique: Optional[np.typing.NDArray[Any]] = None, ) -> Tuple[pd.Series, Dict[str, Any]]: """Convert type to binary. @@ -605,7 +605,7 @@ def _to_binary( def _valid_categorical_indicator( series: pd.Series, - unique: Optional[np.ndarray] = None, + unique: Optional[np.typing.NDArray[Any]] = None, raise_error: bool = False, ) -> bool: """Check whether a feature is a valid categorical indicator type. @@ -638,7 +638,7 @@ def _valid_categorical_indicator( def _convertible_to_categorical_indicators( series: pd.Series, - unique: Optional[np.ndarray] = None, + unique: Optional[np.typing.NDArray[Any]] = None, category_max: int = 20, raise_error_over_max: bool = False, ) -> bool: @@ -669,7 +669,7 @@ def _convertible_to_categorical_indicators( def _to_categorical_indicators( data: pd.DataFrame, col: str, - unique: Optional[np.ndarray] = None, + unique: Optional[np.typing.NDArray[Any]] = None, ) -> Tuple[pd.DataFrame, Dict[str, Any]]: """Convert type to binary categorical indicators. @@ -719,7 +719,7 @@ def _to_categorical_indicators( def convertible_to_type( series: pd.Series, type_: str, - unique: Optional[np.ndarray] = None, + unique: Optional[np.typing.NDArray[Any]] = None, raise_error: bool = False, ) -> bool: """Check whether a feature can be converted to some type. @@ -768,7 +768,7 @@ def convertible_to_type( def is_valid( series: pd.Series, type_: str, - unique: Optional[np.ndarray] = None, + unique: Optional[np.typing.NDArray[Any]] = None, ) -> bool: """Check whether a feature is valid as a given type. @@ -843,7 +843,7 @@ def _to_type( data: pd.DataFrame, col: str, new_type: str, - unique: Optional[np.ndarray] = None, + unique: Optional[np.typing.NDArray[Any]] = None, ) -> Tuple[Union[pd.Series, pd.DataFrame], Dict[str, Any]]: """Convert a feature to a given type. @@ -898,7 +898,7 @@ def _to_type( def to_types( data: pd.DataFrame, - new_types: dict, + new_types: Dict[str, str], ) -> Tuple[pd.DataFrame, Dict[str, Any]]: """Convert features to given types. @@ -926,7 +926,7 @@ def to_types( def _infer_type( series: pd.Series, - unique: Optional[np.ndarray] = None, + unique: Optional[np.typing.NDArray[Any]] = None, ) -> str: """Infer intended feature type and perform the relevant conversion. diff --git a/cyclops/process/feature/normalize.py b/cyclops/process/feature/normalize.py index 061380f58..6860ef542 100644 --- a/cyclops/process/feature/normalize.py +++ b/cyclops/process/feature/normalize.py @@ -50,7 +50,7 @@ def __init__(self, method: str): self.scaler = METHOD_MAP[method]() - def __repr__(self): + def __repr__(self) -> str: """Repr method. Returns @@ -61,7 +61,7 @@ def __repr__(self): """ return self.method - def fit(self, data: Union[np.ndarray, pd.Series]) -> None: + def fit(self, data: Union[np.typing.NDArray[Any], pd.Series]) -> None: """Fit the scaler. Parameters @@ -93,9 +93,9 @@ def fit(self, data: Union[np.ndarray, pd.Series]) -> None: def _transform_by_method( self, - data: Union[np.ndarray, pd.Series], + data: Union[np.typing.NDArray[Any], pd.Series], method: str, - ) -> Union[np.ndarray, pd.Series]: + ) -> Union[np.typing.NDArray[Any], pd.Series]: """Apply a method on the scaler. If a numpy.ndarray is given, a numpy.ndarray is returned. Similarly, if a @@ -142,8 +142,8 @@ def _transform_by_method( def transform( self, - data: Union[np.ndarray, pd.Series], - ) -> Union[np.ndarray, pd.Series]: + data: Union[np.typing.NDArray[Any], pd.Series], + ) -> Union[np.typing.NDArray[Any], pd.Series]: """Apply normalization. If a numpy.ndarray is given, a numpy.ndarray is returned. Similarly, if a @@ -164,8 +164,8 @@ def transform( def inverse_transform( self, - data: Union[np.ndarray, pd.Series], - ) -> Union[np.ndarray, pd.Series]: + data: Union[np.typing.NDArray[Any], pd.Series], + ) -> Union[np.typing.NDArray[Any], pd.Series]: """Apply inverse normalization. If a numpy.ndarray is given, a numpy.ndarray is returned. Similarly, if a @@ -205,7 +205,7 @@ class GroupbyNormalizer: def __init__( self, - normalizer_map: dict, + normalizer_map: Dict[str, str], by: Optional[Union[str, List[str]]] = None, ) -> None: """Initialize.""" @@ -219,7 +219,7 @@ def __init__( self.by = to_list_optional(by) self.normalizers = None - def get_map(self) -> dict: + def get_map(self) -> Dict[str, str]: """Get normalization mapping from features to type. Returns @@ -230,7 +230,7 @@ def get_map(self) -> dict: """ return self.normalizer_map - def get_by(self) -> Optional[List]: + def get_by(self) -> Optional[List[str]]: """Get groupby columns. Returns @@ -255,7 +255,7 @@ def fit(self, data: pd.DataFrame) -> None: "DataFrame required to have a range index. Try resetting the index.", ) - def get_normalizer_for_group(group: pd.DataFrame): + def get_normalizer_for_group(group: pd.DataFrame) -> pd.DataFrame: cols = [] data = [] for col, method in self.normalizer_map.items(): @@ -306,7 +306,7 @@ def _transform_by_method(self, data: pd.DataFrame, method: str) -> pd.DataFrame: "DataFrame required to have a range index. Try resetting the index.", ) - def transform_group(group): + def transform_group(group) -> pd.DataFrame: for col in self.normalizer_map: # Get normalizer object and transform normalizer = self.normalizers.loc[group.index.values[0]][col] @@ -335,7 +335,7 @@ def transform_group(group): return data - def transform(self, data: pd.DataFrame): + def transform(self, data: pd.DataFrame) -> pd.DataFrame: """Normalize the data. Parameters @@ -351,7 +351,7 @@ def transform(self, data: pd.DataFrame): """ return self._transform_by_method(data, "transform") - def inverse_transform(self, data: pd.DataFrame): + def inverse_transform(self, data: pd.DataFrame) -> pd.DataFrame: """Inversely normalize the data. Parameters @@ -387,7 +387,7 @@ class VectorizedNormalizer: def __init__( self, axis: int, - normalizer_map: dict, + normalizer_map: Dict[Any, str], ) -> None: """Initialize.""" self.axis = axis @@ -395,7 +395,7 @@ def __init__( self.normalizers: Dict[Any, SklearnNormalizer] = {} self.is_fit: bool = False - def get_map(self) -> Optional[dict]: + def get_map(self) -> Optional[Dict[str, str]]: """Get normalization mapping from features to type. Returns @@ -422,7 +422,7 @@ def _check_missing(self, index_map: Dict[str, int]) -> None: if len(missing) != 0: raise ValueError(f"Missing features {', '.join(missing)} in the data.") - def fit(self, data: np.ndarray, index_map: Dict[str, int]) -> None: + def fit(self, data: np.typing.NDArray[Any], index_map: Dict[str, int]) -> None: """Fit the normalizing objects. Parameters @@ -454,10 +454,10 @@ def fit(self, data: np.ndarray, index_map: Dict[str, int]) -> None: def _transform_by_method( self, - data: np.ndarray, + data: np.typing.NDArray[Any], index_map: Dict[str, int], method: str, - ) -> np.ndarray: + ) -> np.typing.NDArray[Any]: """Apply a method from the normalizer object to the data. Parameters @@ -491,7 +491,9 @@ def _transform_by_method( return data - def transform(self, data: np.ndarray, index_map: Dict[str, int]): + def transform( + self, data: np.typing.NDArray[Any], index_map: Dict[str, int] + ) -> np.typing.NDArray[Any]: """Normalize the data. Parameters @@ -512,7 +514,9 @@ def transform(self, data: np.ndarray, index_map: Dict[str, int]): return self._transform_by_method(data, index_map, "transform") - def inverse_transform(self, data: np.ndarray, index_map: Dict[str, int]): + def inverse_transform( + self, data: np.typing.NDArray[Any], index_map: Dict[str, int] + ) -> np.typing.NDArray[Any]: """Inversely normalize the data. Parameters @@ -547,7 +551,7 @@ def set_normalizers(self, normalizers: Dict[str, Any]) -> None: self.normalizers = normalizers self.is_fit = True - def subset(self, indexes: np.ndarray) -> VectorizedNormalizer: + def subset(self, indexes: np.typing.NDArray[Any]) -> VectorizedNormalizer: """Subset the normalizers and return this new VectorizedNormalizer. Parameters diff --git a/cyclops/process/feature/split.py b/cyclops/process/feature/split.py index 1c1b52fb2..024facf0a 100644 --- a/cyclops/process/feature/split.py +++ b/cyclops/process/feature/split.py @@ -1,6 +1,7 @@ """Dataset split processing.""" from typing import ( + Any, Generator, Iterable, List, @@ -44,7 +45,7 @@ def __normalize_fractions( def fractions_to_split( fractions: Union[int, float, Sequence[Union[float, int]]], n_samples: int, -) -> np.ndarray: +) -> Any: """Create an array of index split points useful for dataset splitting. Created using the length of the data and the desired split fractions. @@ -103,7 +104,7 @@ def split_idx( n_samples: int, randomize: bool = True, seed: Optional[int] = None, -) -> tuple: +) -> Tuple[np.typing.NDArray[np.int_], ...]: """Create disjoint subsets of indices. Parameters @@ -125,7 +126,6 @@ def split_idx( """ split = fractions_to_split(fractions, n_samples) idx = np.arange(n_samples) - # Optionally randomize if randomize: rng = np.random.default_rng(seed) @@ -136,10 +136,10 @@ def split_idx( def split_idx_stratified( fractions: Union[float, List[float]], - stratify_labels: np.ndarray, + stratify_labels: np.typing.NDArray[np.int_], randomize: bool = True, seed: Optional[int] = None, -) -> tuple: +) -> Tuple[np.typing.NDArray[np.int_], ...]: """Create disjoint, label-stratified subsets of indices. There will be the equal label proportions in each subset. @@ -193,7 +193,7 @@ def split_kfold( n_samples: int, randomize: bool = True, seed: Optional[int] = None, -) -> np.ndarray: +) -> Tuple[np.typing.NDArray[np.int_], ...]: """Create K disjoint subsets of indices equal in length. These K equally sized folds are useful for K-fold cross validation. @@ -220,9 +220,9 @@ def split_kfold( def idxs_to_splits( - samples: np.ndarray, - idxs: Tuple, -): + samples: np.typing.NDArray[np.float_], + idxs: Tuple[np.typing.NDArray[np.int_], ...], +) -> Tuple[np.typing.NDArray[np.float_], ...]: """Create data subsets using subsets of indices. Parameters @@ -243,10 +243,10 @@ def idxs_to_splits( def kfold_cross_val( k_folds: int, - samples: np.ndarray, + samples: np.typing.NDArray[np.float_], randomize: bool = True, seed: Optional[int] = None, -) -> Generator[Tuple[np.ndarray, np.ndarray], None, None]: +) -> Generator[Tuple[np.typing.NDArray[np.float_], ...], None, None]: """Perform K-fold cross validation. Parameters @@ -279,7 +279,7 @@ def intersect_datasets( datas: List[pd.DataFrame], on_col: str, sort: bool = True, -) -> Tuple: +) -> Tuple[pd.DataFrame, ...]: """Perform an intersection across datasets over a column. This can be used to align dataset samples e.g., aligning encounters for a tabular @@ -320,10 +320,10 @@ def intersect_datasets( def split_datasets_by_idx( - datasets: Union[np.ndarray, List[np.ndarray]], - idx_splits: Tuple, + datasets: Union[np.typing.NDArray[np.float_], List[np.typing.NDArray[np.float_]]], + idx_splits: Tuple[np.typing.NDArray[np.int_], ...], axes: Optional[Union[int, List[int]]] = None, -): +) -> Tuple[np.typing.NDArray[np.float_], ...]: """Split datasets by index over given axes. Parameters @@ -366,18 +366,18 @@ def split_datasets_by_idx( splits[-1] = tuple(splits[-1]) if len(splits) == 1: - return splits[0] + return splits[0] # type: ignore return tuple(splits) def split_datasets( - datasets: Union[np.ndarray, List[np.ndarray]], + datasets: Union[np.typing.NDArray[np.float_], List[np.typing.NDArray[np.float_]]], fractions: Union[float, List[float]], axes: Optional[Union[int, List[int]]] = None, randomize: bool = True, seed: Optional[int] = None, -) -> Tuple: +) -> Tuple[np.typing.NDArray[np.float_], ...]: """Split a dataset into a number of datasets. Parameters diff --git a/cyclops/process/feature/vectorized.py b/cyclops/process/feature/vectorized.py index 66f48ade5..a05924f6b 100644 --- a/cyclops/process/feature/vectorized.py +++ b/cyclops/process/feature/vectorized.py @@ -57,7 +57,7 @@ def process_axes( def intersect_vectorized( vecs: List[Vectorized], axes: Union[str, int, List[str], List[int]] = 0, -) -> Tuple: +) -> Tuple[Vectorized, ...]: """Perform an intersection over the indexes of vectorized datasets. This is especially useful to align the samples of separate datasets. @@ -80,7 +80,7 @@ def intersect_vectorized( axes_list: List[int] = process_axes(vecs, axes) # Get intersection - index_sets = [set(vec.get_index(axes_list[i])) for i, vec in enumerate(vecs)] + index_sets = [set(vec.get_index(axes_list[i])) for i, vec in enumerate(vecs)] # type: ignore intersect = np.array(list(set.intersection(*index_sets))) # Return intersected datasets @@ -97,7 +97,7 @@ def split_vectorized( axes: Union[str, int, List[str], List[int]] = 0, randomize: bool = True, seed: Optional[int] = None, -) -> Tuple: +) -> Tuple[Tuple[Vectorized, ...], ...]: """Split vectorized datasets matching the index. Parameters @@ -144,7 +144,7 @@ def split_vectorized( ) splits = [ - vec.split_by_indices(axes_list[i], index_splits) for i, vec in enumerate(vecs) + vec.split_by_indices(axes_list[i], index_splits) for i, vec in enumerate(vecs) # type: ignore ] return tuple(splits) @@ -171,34 +171,29 @@ class Vectorized: def __init__( self, - data: np.ndarray, - indexes: List[Union[List, np.ndarray]], + data: np.typing.NDArray[Any], + indexes: Sequence[Union[List[Any], np.typing.NDArray[Any]]], axis_names: List[str], is_normalized: bool = False, ) -> None: """Init.""" if not isinstance(data, np.ndarray): raise ValueError("Data must be a numpy.ndarray.") - if len(indexes) != data.ndim: raise ValueError( "Number of array axes and the number of indexes do not match.", ) - if len(axis_names) != data.ndim: raise ValueError( "Number of array axes and the number of axis names do not match.", ) - if not all(isinstance(name, str) for name in axis_names): raise ValueError("Axis names must be strings.") - for i, index in enumerate(indexes): if not isinstance(index, list) and not isinstance(index, np.ndarray): raise ValueError("Indexes must be a list of list or numpy.ndarray.") index = np.array(index) - if len(index) != data.shape[i]: raise ValueError( ( @@ -210,21 +205,18 @@ def __init__( raise ValueError( "Each index must have no duplicate values to uniquely identify.", ) - - indexes[i] = index - - self.data: np.ndarray = data - self.indexes: List[np.ndarray] = indexes - self.index_maps: List[Dict[str, int]] = [ + indexes[i] = index # type: ignore + self.data: np.typing.NDArray[Any] = data + self.indexes: List[np.typing.NDArray[Any]] = indexes # type: ignore + self.index_maps: List[Dict[Any, int]] = [ {val: i for i, val in enumerate(index)} for index in indexes ] self.axis_names: List[str] = axis_names - self.normalizer: Optional[VectorizedNormalizer] = None self.is_normalized = is_normalized @property - def shape(self) -> Tuple: + def shape(self) -> Tuple[int, ...]: """Get data shape, as an attribute. Returns @@ -235,7 +227,7 @@ def shape(self) -> Tuple: """ return self.data.shape - def get_data(self) -> np.ndarray: + def get_data(self) -> np.typing.NDArray[Any]: """Get the vectorized data. Returns @@ -250,7 +242,7 @@ def add_normalizer( self, axis: Union[str, int], normalization_method: Optional[str] = None, - normalizer_map: Optional[dict] = None, + normalizer_map: Optional[Dict[str, str]] = None, ) -> None: """Add a normalizer. @@ -284,15 +276,13 @@ def add_normalizer( axis_index = self.get_axis(axis) index_map = self.index_maps[axis_index] - if normalizer_map is None: # Use the same normalization method for all features - normalizer_map = {feat: normalization_method for feat in index_map} + normalizer_map = {feat: normalization_method for feat in index_map} # type: ignore else: missing = set(normalizer_map.keys()) - set(index_map.keys()) if len(missing) != 0: raise ValueError(f"Invalid index values {', '.join(missing)}.") - normalizer = VectorizedNormalizer(axis_index, normalizer_map) self.normalizer = normalizer @@ -310,17 +300,14 @@ def add_normalizer_direct(self, normalizer: VectorizedNormalizer) -> None: if self.normalizer is not None: LOGGER.warning("Replacing existing normalizer.") - self.normalizer = normalizer def fit_normalizer(self) -> None: """Fit the normalizer.""" if self.normalizer is None: raise ValueError("Must add a normalizer.") - if self.normalizer.is_fit: LOGGER.warning("Re-fitting existing normalizer.") - index_map = self.index_maps[self.normalizer.axis] self.normalizer.fit(self.data, index_map) @@ -332,10 +319,8 @@ def normalize(self) -> None: """ if self.normalizer is None: raise ValueError("No normalizer was added.") - if self.is_normalized: raise ValueError("Data normalized. Cannot normalize.") - index_map = self.index_maps[self.normalizer.axis] self.normalizer.transform(self.data, index_map) self.is_normalized = True @@ -348,10 +333,8 @@ def inverse_normalize(self) -> None: """ if self.normalizer is None: raise ValueError("No normalizer was added.") - if not self.is_normalized: raise ValueError("Data not normalized. Cannot inverse normalize.") - index_map = self.index_maps[self.normalizer.axis] self.normalizer.inverse_transform(self.data, index_map) self.is_normalized = False @@ -377,7 +360,7 @@ def save(self, save_path: str, file_format: str = "npy") -> str: def take_with_indices( self, axis: Union[str, int], - indices: Union[List[int], np.ndarray], + indices: Union[List[Any], np.typing.NDArray[Any]], ) -> Vectorized: """Get data by indexing an axis. @@ -395,21 +378,17 @@ def take_with_indices( """ axis_index = self.get_axis(axis) - # Index the data accordingly data = take_indices_over_axis(self.data, axis_index, indices) - # Create the corresponding indexes new_indexes = list(self.indexes) - new_indexes[axis_index] = [self.indexes[axis_index][ind] for ind in indices] - + new_indexes[axis_index] = [self.indexes[axis_index][ind] for ind in indices] # type: ignore vec = Vectorized( data, new_indexes, self.axis_names, is_normalized=self.is_normalized, ) - # Add normalizers (and possibly a subset of the existing normalizers if # splitting on the normalization axis) if self.normalizer is not None: @@ -417,7 +396,6 @@ def take_with_indices( normalizer = copy.deepcopy(self.normalizer) else: normalizer = self.normalizer.subset(vec.indexes[self.normalizer.axis]) - vec.add_normalizer_direct(normalizer) return vec @@ -425,7 +403,7 @@ def take_with_indices( def take_with_index( self, axis: Union[str, int], - index: Union[List[Any], np.ndarray], + index: Union[List[Any], np.typing.NDArray[Any]], ) -> Vectorized: """Get data by indexing an axis using its index. @@ -444,15 +422,12 @@ def take_with_index( """ axis_index = self.get_axis(axis) index_map = self.index_maps[axis_index] - if not isinstance(index, list) and not isinstance(index, np.ndarray): raise ValueError("Index must either be a list or a NumPy array.") - # Map values to indices missing = [val for val in index if val not in index_map] if len(missing) > 0: raise ValueError(f"Index does not have values {', '.join(missing)}.") - indices = [index_map[val] for val in index] return self.take_with_indices(axis_index, indices) @@ -476,7 +451,6 @@ def get_axis(self, axis: Union[int, str]) -> int: if axis >= len(self.indexes) or axis < 0: raise ValueError("Axis out of bounds.") return axis - # If an axis name was given if isinstance(axis, str): if self.axis_names is None: @@ -491,7 +465,7 @@ def get_axis(self, axis: Union[int, str]) -> int: raise ValueError("Axis is an invalid type. Must be an int or string.") - def get_index(self, axis: Union[int, str]) -> np.ndarray: + def get_index(self, axis: Union[int, str]) -> np.typing.ArrayLike: """Get an axis index by index or by name. Parameters @@ -507,7 +481,7 @@ def get_index(self, axis: Union[int, str]) -> np.ndarray: """ return self.indexes[self.get_axis(axis)] - def get_index_map(self, axis: Union[int, str]) -> np.ndarray: + def get_index_map(self, axis: Union[int, str]) -> Dict[Any, int]: """Get an axis index by index or by name. Parameters @@ -526,9 +500,9 @@ def get_index_map(self, axis: Union[int, str]) -> np.ndarray: def split_by_indices( self, axis: Union[str, int], - indices: Union[Sequence[int], np.ndarray], + indices: List[Union[List[Any], np.typing.NDArray[Any]]], allow_drops: bool = False, - ): + ) -> Tuple[Vectorized, ...]: """Split the data over an axis using indices. Parameters @@ -548,7 +522,6 @@ def split_by_indices( """ axis_index = self.get_axis(axis) - # Check for invalid duplicate indices all_vals = np.concatenate(indices).ravel() if len(all_vals) != len(np.unique(all_vals)): @@ -556,13 +529,11 @@ def split_by_indices( "Splits cannot contain duplicate values. " "Ensure all values are unique across the splits.", ) - if not allow_drops: required_indices = set(np.arange(self.data.shape[axis_index])) diff = required_indices - set(all_vals) if len(diff) > 0: raise ValueError("Not allowing dropping and missing certain values.") - vec_splits = [] for split_indices in indices: vec_splits.append(self.take_with_indices(axis_index, split_indices)) @@ -572,9 +543,9 @@ def split_by_indices( def split_by_index( self, axis: Union[str, int], - index_names: List[Union[List[Any], np.ndarray]], + index_names: List[Union[List[Any], np.typing.NDArray[Any]]], allow_drops: bool = False, - ): + ) -> Tuple[Vectorized, ...]: """Split the data over an axis using index names. Parameters @@ -595,8 +566,7 @@ def split_by_index( """ axis_index = self.get_axis(axis) index_map = self.index_maps[axis_index] - - indices: List[Union[List[int], np.ndarray]] = [] + indices: List[Union[List[Any], np.typing.NDArray[Any]]] = [] for names in index_names: indices.append([]) for name in names: @@ -605,7 +575,7 @@ def split_by_index( if allow_drops: continue raise ValueError(f"Invalid index name {name}.") - indices[-1].append(index_map[name]) + indices[-1].append(index_map[name]) # type: ignore indices[-1] = np.array(indices[-1]) return self.split_by_indices( @@ -620,7 +590,7 @@ def split_by_fraction( fractions: Union[float, List[float]], randomize: bool = True, seed: Optional[int] = None, - ): + ) -> Tuple[Vectorized, ...]: """Split the data over an axis using split fractions. Parameters @@ -642,7 +612,6 @@ def split_by_fraction( """ axis_index = self.get_axis(axis) - indices = split_idx( fractions=fractions, n_samples=self.data.shape[axis_index], @@ -652,15 +621,15 @@ def split_by_fraction( return self.split_by_indices( axis=axis_index, - indices=indices, + indices=indices, # type: ignore allow_drops=False, ) def split_out( self, axis: Union[str, int], - index_names: Union[List[Any], np.ndarray], - ): + index_names: Union[List[Any], np.typing.ArrayLike], + ) -> Tuple[Vectorized, ...]: """Split out some indexes by name. Parameters @@ -690,8 +659,8 @@ def split_out( def remove_with_index( self, axis: Union[str, int], - index_names: Union[List[Any], np.ndarray], - ): + index_names: Union[List[Any], np.typing.ArrayLike], + ) -> Vectorized: """Split out some indexes by name. Parameters @@ -710,6 +679,7 @@ def remove_with_index( axis_index = self.get_axis(axis) index_names = np.array(index_names) remaining = np.setdiff1d(self.indexes[axis_index], index_names) + return self.take_with_index(axis_index, remaining) def rename_axis(self, axis: Union[str, int], name: str) -> None: @@ -746,15 +716,12 @@ def swap_axes( # Process axes axis1_index: int = self.get_axis(axis1) axis2_index: int = self.get_axis(axis2) - # Call moveaxis before meta changes in case there are errors self.data = np.swapaxes(self.data, axis1_index, axis2_index) - # Update meta self.indexes = list_swap(self.indexes, axis1_index, axis2_index) self.index_maps = list_swap(self.index_maps, axis1_index, axis2_index) self.axis_names = list_swap(self.axis_names, axis1_index, axis2_index) - # Update axis on which the normalizer acts if it was switched if self.normalizer is not None: if self.normalizer.axis == axis1_index: @@ -766,7 +733,7 @@ def value_counts( self, axis: Union[str, int], index: Any, - ) -> Tuple[np.ndarray, np.ndarray]: + ) -> Tuple[Any, ...]: """Return the value counts for a given axis and index. Parameters @@ -780,9 +747,10 @@ def value_counts( axis_index = self.get_axis(axis) index_map = self.index_maps[axis_index] data = take_indices_over_axis(self.data, axis_index, [index_map[index]]) + return np.unique(data, return_counts=True) - def _check_index_exp(self, index_exp: Tuple) -> None: + def _check_index_exp(self, index_exp: Tuple[slice, ...]) -> None: """Check that an index expression is valid. Parameters @@ -793,7 +761,6 @@ def _check_index_exp(self, index_exp: Tuple) -> None: """ if not isinstance(index_exp, tuple): raise ValueError("Index expression must be a tuple.") - for i in index_exp: if not isinstance(i, slice): raise ValueError( @@ -806,9 +773,9 @@ def _check_index_exp(self, index_exp: Tuple) -> None: def impute_over_axis( self, axis: Union[str, int], - impute_fn: Callable, - index_exp: Optional[Tuple] = None, - ): + impute_fn: Callable[[np.typing.ArrayLike], np.typing.ArrayLike], + index_exp: Optional[Tuple[slice, ...]] = None, + ) -> None: """Imputes values over an axis, treating the other axes as grouping. For example, imputing over the timesteps of an event for some encounter. @@ -824,7 +791,6 @@ def impute_over_axis( """ axis_index = self.get_axis(axis) - if index_exp is not None: self._check_index_exp(index_exp) sliced_data = self.data[index_exp] @@ -833,7 +799,6 @@ def impute_over_axis( axis_index, sliced_data, ) - else: self.data = np.apply_along_axis(impute_fn, axis_index, self.data) @@ -841,8 +806,8 @@ def impute( self, impute_axis: Union[str, int], data_axis: Union[str, int], - impute_fn: Callable, - ): + impute_fn: Callable[[np.typing.ArrayLike], np.typing.ArrayLike], + ) -> None: """Impute values with forward fill and/or backward fill and fill null values \ with feature mean. @@ -857,7 +822,6 @@ def impute( """ self.impute_over_axis(impute_axis, impute_fn) - axis = self.get_axis(data_axis) for i in range(self.data.shape[axis]): @@ -870,8 +834,8 @@ def impute( def concat_over_axis( self, axis: Union[str, int], - arr: np.ndarray, - concat_index: Union[List, np.ndarray], + arr: np.typing.NDArray[Any], + concat_index: Union[List[Any], np.typing.NDArray[Any]], ) -> Vectorized: """Concatenate an array over an axis to create a new Vectorized object. @@ -897,19 +861,15 @@ def concat_over_axis( """ axis_index = self.get_axis(axis) concat_index = np.array(concat_index) - # Check dimensionality for issues shape = self.shape arr_shape = arr.shape - if len(concat_index) != arr.shape[axis_index]: raise ValueError( "Incorrect number of index names for the data to concatenate.", ) - if len(shape) != len(arr_shape): raise ValueError("Array must have the same number of dimensions.") - equal_shape = [arr_shape[i] == shape[i] for i in range(len(shape))] equal_shape = ( equal_shape[:axis_index] + equal_shape[axis_index + 1 :] # noqa: E203 @@ -918,16 +878,15 @@ def concat_over_axis( raise ValueError( "Array shape must be identical except along the concatenated axis.", ) - # Check that none of the new indexes already exist index_inter = np.intersect1d(self.indexes[axis_index], concat_index) if len(index_inter) > 0: raise ValueError(f"Forbidden intersection of indexes: {index_inter}.") - # Concatenate and return a new Vectorized object res = np.concatenate([self.data, arr], axis=axis_index) indexes = [ind.copy() for ind in self.indexes] indexes[axis_index] = np.concatenate([indexes[axis_index], concat_index]) + return Vectorized(res, indexes, self.axis_names) @@ -940,7 +899,7 @@ class VectorizedIndexExpression: """ - def __getitem__(self, item: Tuple) -> Tuple: + def __getitem__(self, item: Tuple[Any, ...]) -> Tuple[slice, ...]: """Create index expression using slice notation. Parameters diff --git a/cyclops/process/impute.py b/cyclops/process/impute.py index 753812d32..b67ae9564 100644 --- a/cyclops/process/impute.py +++ b/cyclops/process/impute.py @@ -52,7 +52,7 @@ def compute_inter_range(null: pd.Series) -> Optional[Tuple[int, int]]: return inds[0], inds[-1] + 1 -def np_ffill(arr: np.ndarray) -> np.ndarray: +def np_ffill(arr: np.typing.NDArray[np.float64]) -> Any: """Forward fill a 1D array. Parameters @@ -64,10 +64,11 @@ def np_ffill(arr: np.ndarray) -> np.ndarray: mask = np.isnan(arr) idx = np.where(~mask, np.arange(mask.shape[0]), 0) idx = np.maximum.accumulate(idx, axis=0, out=idx) + return arr[idx] -def np_bfill(arr: np.ndarray) -> np.ndarray: +def np_bfill(arr: np.typing.NDArray[np.float64]) -> Any: """Backward fill a 1D array. Parameters @@ -87,7 +88,7 @@ def np_bfill(arr: np.ndarray) -> np.ndarray: return arr[idx] -def np_ffill_bfill(arr: np.ndarray) -> np.ndarray: +def np_ffill_bfill(arr: np.typing.NDArray[np.float64]) -> np.typing.NDArray[np.float64]: """Equivalent to forward filling and then backward filling a 1D array. Parameters @@ -109,7 +110,9 @@ def np_ffill_bfill(arr: np.ndarray) -> np.ndarray: return arr -def np_fill_null_num(arr: np.ndarray, num: float) -> np.ndarray: +def np_fill_null_num( + arr: np.typing.NDArray[np.float64], num: float +) -> np.typing.NDArray[np.float64]: """Fill null values with a number. Parameters @@ -126,7 +129,9 @@ def np_fill_null_num(arr: np.ndarray, num: float) -> np.ndarray: return np.nan_to_num(arr, nan=num) -def np_fill_null_zero(arr: np.ndarray) -> np.ndarray: +def np_fill_null_zero( + arr: np.typing.NDArray[np.float64], +) -> np.typing.NDArray[np.float64]: """Fill null values with zero. Parameters @@ -143,7 +148,9 @@ def np_fill_null_zero(arr: np.ndarray) -> np.ndarray: return np_fill_null_num(arr, 0) -def np_fill_null_mean(arr: np.ndarray) -> np.ndarray: +def np_fill_null_mean( + arr: np.typing.NDArray[np.float64], +) -> np.typing.NDArray[np.float64]: """Fill null values with the array mean. Parameters @@ -157,7 +164,7 @@ def np_fill_null_mean(arr: np.ndarray) -> np.ndarray: Imputed array. """ - return np_fill_null_num(arr, np.nanmean(arr)) + return np_fill_null_num(arr, float(np.nanmean(arr))) def fill_null_with(series: pd.Series, null: pd.Series, value: Any) -> pd.Series: @@ -253,10 +260,10 @@ class SeriesImputer: def __init__( self, - imputefunc: Union[str, Callable] = MEAN, - allow_nulls_returned=True, - limit_area=None, - ): + imputefunc: Union[str, Callable[[pd.Series, pd.Series], pd.Series]], + allow_nulls_returned: bool = True, + limit_area: Optional[str] = None, + ) -> None: """Init.""" self.using_drop = imputefunc == DROP @@ -277,7 +284,9 @@ def __init__( self.limit_area = limit_area - def _process_imputefunc(self, imputefunc: Union[str, Callable]) -> Callable: + def _process_imputefunc( + self, imputefunc: Union[str, Callable[..., Any]] + ) -> Callable[..., Any]: """Process imputation function. Convert a imputefunc string to an imputefunc if recognized. @@ -568,7 +577,7 @@ def extra(self, group: pd.DataFrame) -> pd.DataFrame: return self.extra_imputer(group) -def numpy_2d_ffill(arr: np.ndarray) -> np.ndarray: +def numpy_2d_ffill(arr: np.typing.NDArray[np.float64]) -> Any: """Foward fill a 2D array in a row-wise fashion, i.e., filling each row separately. Parameters @@ -588,4 +597,5 @@ def numpy_2d_ffill(arr: np.ndarray) -> np.ndarray: mask = np.isnan(arr) idx = np.where(~mask, np.arange(mask.shape[1]), 0) np.maximum.accumulate(idx, axis=1, out=idx) + return arr[np.arange(idx.shape[0])[:, None], idx] diff --git a/cyclops/process/string_ops.py b/cyclops/process/string_ops.py index fc3c5b323..fee5dad31 100644 --- a/cyclops/process/string_ops.py +++ b/cyclops/process/string_ops.py @@ -2,7 +2,7 @@ import re from collections import Counter -from typing import Iterable, List, Union +from typing import Iterable, List, Tuple, Union import numpy as np @@ -206,7 +206,7 @@ def normalize_special_characters(item: str) -> str: return re.sub(r"(?s:(^[0-9_].+))", "a_\1", item) -def count_occurrences(items: Iterable) -> List: +def count_occurrences(items: Iterable[str]) -> List[Tuple[str, int]]: """Count number of occurrences of the items. Parameters diff --git a/cyclops/process/util.py b/cyclops/process/util.py index 419c06c45..9dd51521a 100644 --- a/cyclops/process/util.py +++ b/cyclops/process/util.py @@ -2,11 +2,10 @@ import logging from functools import wraps -from typing import Callable, List, Optional, Union +from typing import Any, Callable, List, Optional, Union import pandas as pd -from cyclops.process.column_names import ENCOUNTER_ID from cyclops.utils.common import to_list from cyclops.utils.log import setup_logging @@ -18,7 +17,7 @@ def create_indicator_variables( features: pd.DataFrame, - columns: Optional[List] = None, + columns: Optional[List[str]] = None, ) -> pd.DataFrame: """Create binary indicator variable for each column (or specified). @@ -28,7 +27,7 @@ def create_indicator_variables( ---------- features: pandas.DataFrame Input features with missing values. - columns: list, optional + columns: List[str], optional Columns to create variables, all if not specified. Returns @@ -42,7 +41,7 @@ def create_indicator_variables( return indicator_features.notnull().astype(int).add_suffix("_indicator") -def is_timestamp_series(series, raise_error: bool = False): +def is_timestamp_series(series: pd.Series, raise_error: bool = False) -> Any: """Check whether a series has the Pandas Timestamp datatype. Parameters @@ -57,7 +56,6 @@ def is_timestamp_series(series, raise_error: bool = False): """ is_timestamp = series.dtype == pd.to_datetime(["2069-03-29 02:30:00"]).dtype - if not is_timestamp and raise_error: raise ValueError(f"{series.name} must be a timestamp Series.") @@ -104,60 +102,7 @@ def has_columns( return present -def assert_has_columns(*args, **kwargs) -> Callable: - """Decorate function to assert that input DataFrames have the necessary columns. - - assert_has_columns(["A", "B"], None) is equivalent to assert_has_columns(["A", "B"]) - but may be necessary when wanting to check, - assert_has_columns(["A"], None, ["C"]) - - Can also check keyword arguments, e.g., optional DataFrames, - assert_has_columns(["A"], optional_df=["D"]) - - Parameters - ---------- - *args - Required columns of the function's ordered DataFrame arguments. - **kwargs - Keyword corresponds to the DataFrame kwargs of the function. - The value is this keyword argument's required columns. - - Returns - ------- - Callable - Decorator function. - - """ - - def decorator(func_: Callable) -> Callable: - @wraps(func_) - def wrapper_func(*fn_args, **fn_kwargs) -> Callable: - # Check only the DataFrame arguments - dataframe_args = [i for i in fn_args if isinstance(i, pd.DataFrame)] - - assert len(args) <= len(dataframe_args) - - for i, arg in enumerate(args): - if arg is None: # Can specify None to skip over checking a DataFrame - continue - has_columns(dataframe_args[i], arg, raise_error=True) - - for key, required_cols in kwargs.items(): - # If an optional DataFrame is not provided, it will be skipped - if key not in fn_kwargs: - continue - - assert isinstance(fn_kwargs[key], pd.DataFrame) - has_columns(fn_kwargs[key], required_cols, raise_error=True) - - return func_(*fn_args, **fn_kwargs) - - return wrapper_func - - return decorator - - -def has_range_index(data: pd.DataFrame) -> bool: +def has_range_index(data: pd.DataFrame) -> Union[bool, pd.Series, pd.DataFrame]: """Check whether a DataFrame has a range index. Parameters @@ -167,7 +112,7 @@ def has_range_index(data: pd.DataFrame) -> bool: Returns ------- - bool + bool or pandas.Series or pandas.DataFrame Whether the data has a range index. """ @@ -218,13 +163,21 @@ def gather_columns(data: pd.DataFrame, columns: Union[List[str], str]) -> pd.Dat return data[to_list(columns)].copy() -def log_counts_step(data, step_description: str, rows=True, columns=False) -> None: +def log_df_counts( + data: pd.DataFrame, + col: str, + step_description: str, + rows: bool = True, + columns: bool = False, +) -> None: """Log num. of encounters and num. of samples (rows). Parameters ---------- data: pandas.DataFrame Encounter specific input data. + col: str + Column name to count. step_description: str Description of intermediate processing step. rows: bool @@ -234,7 +187,7 @@ def log_counts_step(data, step_description: str, rows=True, columns=False) -> No """ LOGGER.info(step_description) - num_encounters = data[ENCOUNTER_ID].nunique() + num_encounters = data[col].nunique() if rows: num_samples = len(data) LOGGER.info("# samples: %d, # encounters: %d", num_samples, num_encounters) diff --git a/cyclops/utils/common.py b/cyclops/utils/common.py index af9324063..bd920c44a 100644 --- a/cyclops/utils/common.py +++ b/cyclops/utils/common.py @@ -70,7 +70,7 @@ def add_years_approximate( # Subtract 1 from potentially invalid leap days to avoid issues leap_days = (month == 2) & (day == 29) - data["day"][leap_days] -= 1 + data.loc[leap_days, "day"] -= 1 return pd.to_datetime(data) diff --git a/tests/cyclops/process/feature/test_feature.py b/tests/cyclops/process/feature/test_feature.py index 8a4dab20c..b1531e8cd 100644 --- a/tests/cyclops/process/feature/test_feature.py +++ b/tests/cyclops/process/feature/test_feature.py @@ -6,7 +6,6 @@ import numpy as np import pandas as pd -from cyclops.process.column_names import ENCOUNTER_ID from cyclops.process.constants import ( BINARY, FEATURE_MAPPING_ATTR, @@ -20,6 +19,9 @@ from cyclops.process.feature.normalize import GroupbyNormalizer +ENCOUNTER_ID = "enc_id" + + def test__feature_meta__get_type(): """Test FeatureMeta.get_type fn.""" feature_meta_numeric = FeatureMeta(**{FEATURE_TYPE_ATTR: NUMERIC}) diff --git a/tests/cyclops/process/feature/test_normalize.py b/tests/cyclops/process/feature/test_normalize.py index 02e99e91a..4bec6ba1a 100644 --- a/tests/cyclops/process/feature/test_normalize.py +++ b/tests/cyclops/process/feature/test_normalize.py @@ -4,13 +4,6 @@ import pandas as pd import pytest -from cyclops.process.column_names import ( - ENCOUNTER_ID, - EVENT_NAME, - EVENT_VALUE, - EVENT_VALUE_UNIT, - SUBJECT_ID, -) from cyclops.process.constants import MIN_MAX, STANDARD from cyclops.process.feature.normalize import ( GroupbyNormalizer, @@ -19,6 +12,13 @@ ) +ENCOUNTER_ID = "enc_id" +EVENT_NAME = "event_name" +EVENT_VALUE = "event_value" +EVENT_VALUE_UNIT = "event_value_unit" +SUBJECT_ID = "subject_id" + + @pytest.fixture() def test_input(): """Create a test input.""" diff --git a/tests/cyclops/process/test_aggregate.py b/tests/cyclops/process/test_aggregate.py index 739cffc44..9b35e104a 100644 --- a/tests/cyclops/process/test_aggregate.py +++ b/tests/cyclops/process/test_aggregate.py @@ -8,17 +8,14 @@ import pytest from pandas import Timestamp -from cyclops.process.aggregate import AGGFUNCS, Aggregator -from cyclops.process.column_names import ( - ENCOUNTER_ID, - EVENT_NAME, - EVENT_TIMESTAMP, - EVENT_VALUE, +from cyclops.process.aggregate import ( + AGGFUNCS, RESTRICT_TIMESTAMP, - START_TIMESTAMP, START_TIMESTEP, - STOP_TIMESTAMP, TIMESTEP, + WINDOW_START_TIMESTAMP, + WINDOW_STOP_TIMESTAMP, + Aggregator, ) from cyclops.process.constants import MEAN, MEDIAN @@ -27,6 +24,10 @@ DATE2 = datetime(2022, 11, 3, hour=14) DATE3 = datetime(2022, 11, 4, hour=3) DATE4 = datetime(2022, 11, 4, hour=13) +ENCOUNTER_ID = "enc_id" +EVENT_NAME = "event_name" +EVENT_VALUE = "event_value" +EVENT_TIMESTAMP = "event_timestamp" @pytest.fixture() @@ -275,8 +276,8 @@ def test_aggregate_one_group_outlier(): EVENT_VALUE, EVENT_TIMESTAMP, "some_str_col", - START_TIMESTAMP, - STOP_TIMESTAMP, + WINDOW_START_TIMESTAMP, + WINDOW_STOP_TIMESTAMP, TIMESTEP, ] diff --git a/tests/cyclops/process/test_clean.py b/tests/cyclops/process/test_clean.py index ed36f1307..aba24313e 100644 --- a/tests/cyclops/process/test_clean.py +++ b/tests/cyclops/process/test_clean.py @@ -1,19 +1,16 @@ """Test clean module.""" -from datetime import datetime import pandas as pd import pytest -from cyclops.process.clean import combine_events, convert_to_events, normalize_events -from cyclops.process.column_names import ( - ENCOUNTER_ID, - EVENT_CATEGORY, - EVENT_NAME, - EVENT_TIMESTAMP, - EVENT_VALUE, - EVENT_VALUE_UNIT, -) +from cyclops.process.clean import normalize_events + + +ENCOUNTER_ID = "enc_id" +EVENT_NAME = "event_name" +EVENT_VALUE = "event_value" +EVENT_VALUE_UNIT = "event_value_unit" @pytest.fixture() @@ -30,68 +27,6 @@ def test_event_data_normalized(): return input_ -def test_combine_events(): - """Test combine_events fn.""" - test_input1 = pd.DataFrame( - columns=[ENCOUNTER_ID, EVENT_TIMESTAMP, EVENT_NAME, EVENT_VALUE], - index=[0], - ) - test_input2 = pd.DataFrame( - columns=[ENCOUNTER_ID, EVENT_TIMESTAMP, EVENT_NAME, EVENT_VALUE], - index=[0, 1], - ) - test_input1.loc[0] = [12, datetime(2022, 11, 3, 12, 13), "eventA", 1.2] - test_input2.loc[0] = [14, datetime(2022, 11, 3, 1, 13), "eventB", 11.2] - test_input2.loc[1] = [12, datetime(2022, 11, 4, 12, 13), "eventA", 111.2] - - events = combine_events([test_input1, test_input2]) - assert len(events) == 3 - assert events.loc[2][EVENT_NAME] == "eventA" - - events = combine_events([test_input1]) - assert events.equals(test_input1) - events = combine_events(test_input1) - assert events.equals(test_input1) - - -def test_convert_to_events(): - """Test convert_to_events fn.""" - test_input = pd.DataFrame(columns=[ENCOUNTER_ID, "test_ts"], index=[0, 1, 2]) - test_input.loc[0] = [12, datetime(2022, 11, 3, 12, 13)] - test_input.loc[1] = [11, datetime(2022, 11, 3, 19, 13)] - test_input.loc[2] = [1, datetime(2022, 11, 2, 1, 1)] - events = convert_to_events( - test_input, - event_name="test", - event_category="test", - timestamp_col="test_ts", - ) - assert len(events) == 3 - assert events.loc[2][ENCOUNTER_ID] == 1 - assert events.loc[1][EVENT_TIMESTAMP] == datetime(2022, 11, 3, 19, 13) - - test_input = pd.DataFrame( - columns=[ENCOUNTER_ID, "test_ts", "test_value"], - index=[0, 1, 2], - ) - test_input.loc[0] = [12, datetime(2022, 11, 3, 12, 13), 1.2] - test_input.loc[1] = [11, datetime(2022, 11, 3, 19, 13), 2.34] - test_input.loc[2] = [1, datetime(2022, 11, 2, 1, 1), 11] - events = convert_to_events( - test_input, - event_name="test", - event_category="test", - timestamp_col="test_ts", - value_col="test_value", - ) - assert len(events) == 3 - assert events.loc[2][ENCOUNTER_ID] == 1 - assert events.loc[1][EVENT_TIMESTAMP] == datetime(2022, 11, 3, 19, 13) - assert events.loc[2][EVENT_VALUE][0] == 11 - assert events.loc[2][EVENT_NAME] == "test" - assert events.loc[2][EVENT_CATEGORY] == "test" - - @pytest.fixture() def test_event_data_unnormalized(): """Create event data test input with unnormalized event values.""" @@ -131,14 +66,24 @@ def test_normalize_events( test_event_data_normalized, ): """Test normalize_events fn.""" - normalized_events = normalize_events(test_event_data_normalized) + normalized_events = normalize_events( + test_event_data_normalized, + event_name_col=EVENT_NAME, + event_value_col=EVENT_VALUE, + event_value_unit_col=EVENT_VALUE_UNIT, + ) assert len(normalized_events[EVENT_NAME].unique()) == 3 assert len(normalized_events[EVENT_VALUE_UNIT].unique()) == 3 assert "test-a" in list(normalized_events[EVENT_NAME]) assert "unit-a" in list(normalized_events[EVENT_VALUE_UNIT]) - normalized_events = normalize_events(test_event_data_unnormalized) + normalized_events = normalize_events( + test_event_data_unnormalized, + event_name_col=EVENT_NAME, + event_value_col=EVENT_VALUE, + event_value_unit_col=EVENT_VALUE_UNIT, + ) assert normalized_events[EVENT_VALUE][0] == 1.0 assert normalized_events[EVENT_VALUE][1] == 1.4 assert normalized_events[EVENT_VALUE][2] == 1.2 diff --git a/tests/cyclops/process/test_impute.py b/tests/cyclops/process/test_impute.py index 8ead0e8eb..11355c1e4 100644 --- a/tests/cyclops/process/test_impute.py +++ b/tests/cyclops/process/test_impute.py @@ -5,7 +5,6 @@ import pandas as pd import pytest -from cyclops.process.column_names import ENCOUNTER_ID, TIMESTEP from cyclops.process.constants import ( BFILL, DROP, @@ -32,6 +31,10 @@ ) +ENCOUNTER_ID = "enc_id" +TIMESTEP = "timestep" + + @pytest.fixture() def test_tabular(): """Create test input tabular features.""" diff --git a/tests/cyclops/process/test_util.py b/tests/cyclops/process/test_util.py index 21a684131..5395733e3 100644 --- a/tests/cyclops/process/test_util.py +++ b/tests/cyclops/process/test_util.py @@ -1,13 +1,11 @@ """Test processor utility functions.""" -from typing import Optional import numpy as np import pandas as pd import pytest from cyclops.process.util import ( - assert_has_columns, create_indicator_variables, gather_columns, has_columns, @@ -40,71 +38,6 @@ def test_has_columns(): has_columns(test_input, ["B", "C"], exactly=True, raise_error=True) -def test_assert_has_columns(): - """Test assert_has_columns decorator.""" - - @assert_has_columns( - ["A", "B"], - None, # No check on df2 - ["Pizza"], - df_kwarg=["sauce", "please"], - ) - def test( - df1: pd.DataFrame, - some_int: int, - df2: pd.DataFrame, - some_str: str, - df3: pd.DataFrame, - int_keyword: Optional[int] = None, - df_kwarg: Optional[pd.DataFrame] = None, - ) -> None: - return None - - df1 = pd.DataFrame(columns=["A", "B", "C"]) - some_int = 1 - df2 = pd.DataFrame(columns=["C", "D"]) - some_str = "A" - df3 = pd.DataFrame(columns=["Pizza", "is", "yummy"]) - int_keyword = 2 - df_kwarg = pd.DataFrame(columns=["Extra", "sauce", "please"]) - - # Passing tests - test(df1, some_int, df2, some_str, df3, int_keyword=int_keyword, df_kwarg=df_kwarg) - test(df1, some_int, df2, some_str, df3, df_kwarg=df_kwarg) - - # Failing tests - df1_fail = pd.DataFrame(columns=["A", "C"]) - try: - test( - df1_fail, - some_int, - df2, - some_str, - df3, - int_keyword=int_keyword, - df_kwarg=df_kwarg, - ) - raise AssertionError - except ValueError as error: - assert "B" in str(error) - - df_kwarg_fail = pd.DataFrame(columns=["hiya"]) - - try: - test( - df1, - some_int, - df2, - some_str, - df3, - int_keyword=int_keyword, - df_kwarg=df_kwarg_fail, - ) - raise AssertionError - except ValueError as error: - assert "sauce" in str(error) - - def test_gather_columns(): """Test gather_columns fn.""" test_input = pd.DataFrame(index=[0, 1], columns=["A", "B", "C"])