From e841e7d3c813fa0213b66bd8e7185fd940739c29 Mon Sep 17 00:00:00 2001 From: Wes Date: Fri, 18 Oct 2024 09:02:34 -0400 Subject: [PATCH 1/2] updates to forward traversal --- graphreduce/graph_reduce.py | 36 +++++++++++++++++++++++++++++------- graphreduce/node.py | 21 +++++++++++++++------ 2 files changed, 44 insertions(+), 13 deletions(-) diff --git a/graphreduce/graph_reduce.py b/graphreduce/graph_reduce.py index f96e6df..e0c4727 100644 --- a/graphreduce/graph_reduce.py +++ b/graphreduce/graph_reduce.py @@ -15,7 +15,6 @@ from structlog import get_logger import pyspark import pyvis -import woodwork as ww # internal from graphreduce.node import GraphReduceNode, DynamicNode, SQLNode @@ -25,7 +24,6 @@ logger = get_logger('GraphReduce') - class GraphReduce(nx.DiGraph): def __init__( self, @@ -62,6 +60,8 @@ def __init__( # Only for SQL engines. lazy_execution: bool = False, + # Debug + debug: bool = False, *args, **kwargs ): @@ -83,6 +83,10 @@ def __init__( auto_feature_hops_back: optional for automatically computing features auto_feature_hops_front: optional for automatically computing features feature_typefunc_map : optional mapping from type to a list of functions (e.g., {'int' : ['min', 'max', 'sum'], 'str' : ['first']}) + label_node: optionl GraphReduceNode for the label + label_operation: optional str or callable operation to call to compute the label + label_field: optional str field to compute the label + debug: bool whether to run debug logging """ super(GraphReduce, self).__init__(*args, **kwargs) @@ -116,6 +120,8 @@ def __init__( # if using Spark self._sqlctx = spark_sqlctx self._storage_client = storage_client + + self.debug = debug if self.compute_layer == ComputeLayerEnum.spark and self._sqlctx is None: raise Exception(f"Must provide a `spark_sqlctx` kwarg if using {self.compute_layer.value} as compute layer") @@ -451,15 +457,24 @@ def traverse_up ( """ parents = [(start, n, 1) for n in self.predecessors(start)] to_traverse = [(n, 1) for n in self.predecessors(start)] - while len(to_traverse): + cur_level = 1 + while len(to_traverse) and cur_level <= self.auto_feature_hops_front: cur_node, cur_level = to_traverse[0] del to_traverse[0] for node in self.predecessors(cur_node): - parents.append((cur_node, node, cur_level+1)) - to_traverse.append((node, cur_level+1)) - - return parents + if cur_level+1 <= self.auto_feature_hops_front: + parents.append((cur_node, node, cur_level+1)) + to_traverse.append((node, cur_level+1)) + # Returns higher levels first so that + # when we iterate through these edges + # we will traverse from top to bottom + # where the bottom is our `start`. + parents_ordered = list(reversed(parents)) + if self.debug: + for ix in range(len(parents_ordered)): + logger.debug(f"index {ix} is level {parents_ordered[ix][-1]}") + return parents_ordered def get_children ( @@ -720,6 +735,13 @@ def do_transformations(self): if self.auto_features: for to_node, from_node, level in self.traverse_up(start=self.parent_node): if self.auto_feature_hops_front and level <= self.auto_feature_hops_front: + # It is assumed that front-facing relations + # are not one to many and therefore we + # won't have duplication on the join. + # This may be an incorrect assumption + # so this implementation is currently brittle. + if self.debug: + logger.debug(f'Performing an auto_features front join from {from_node} to {to_node}') joined_df = self.join_any( to_node, from_node diff --git a/graphreduce/node.py b/graphreduce/node.py index 93b2c3a..a40c58b 100644 --- a/graphreduce/node.py +++ b/graphreduce/node.py @@ -14,7 +14,6 @@ import pyspark from structlog import get_logger from dateutil.parser import parse as date_parse -import woodwork as ww # internal from graphreduce.enum import ComputeLayerEnum, PeriodUnit, SQLOpType @@ -85,6 +84,9 @@ def __init__ ( checkpoints: list = [], # Only for SQL dialects at the moment. lazy_execution: bool = False, + # Read encoding. + delimiter: str = None, + encoding: str = None, ): """ Constructor @@ -112,6 +114,10 @@ def __init__ ( self.spark_sqlctx = spark_sqlctx self.columns = columns + # Read options + self.delimiter = delimiter if delimiter else ',' + self.encoding = encoding + # Lazy execution for the SQL nodes. self._lazy_execution = lazy_execution self._storage_client = storage_client @@ -168,11 +174,14 @@ def do_data ( if self.compute_layer.value == 'pandas': if not hasattr(self, 'df') or (hasattr(self,'df') and not isinstance(self.df, pd.DataFrame)): - self.df = getattr(pd, f"read_{self.fmt}")(self.fpath) + if self.encoding and self.delimiter: + self.df = getattr(pd, f"read_{self.fmt}")(self.fpath, encoding=self.encoding, delimiter=self.delimiter) + else: + self.df = getattr(pd, f"read_{self.fmt}")(self.fpath) # Initialize woodwork. - self.df.ww.init() - self._logical_types = self.df.ww.logical_types + #self.df.ww.init() + #self._logical_types = self.df.ww.logical_types # Rename columns with prefixes. if len(self.columns): @@ -185,8 +194,8 @@ def do_data ( self.df = getattr(dd, f"read_{self.fmt}")(self.fpath) # Initialize woodwork. - self.df.ww.init() - self._logical_types = self.df.ww.logical_types + #self.df.ww.init() + #self._logical_types = self.df.ww.logical_types # Rename columns with prefixes. if len(self.columns): From f98105f1c1309989b51185491db06953f9c1b45f Mon Sep 17 00:00:00 2001 From: Wes Date: Wed, 30 Oct 2024 10:47:00 -0700 Subject: [PATCH 2/2] updates to graphreduce --- graphreduce/common.py | 100 ++++++++++++++++++++++ graphreduce/graph_reduce.py | 73 +++++++++++----- graphreduce/node.py | 161 +++++++++++++++++++++++++++++++----- 3 files changed, 294 insertions(+), 40 deletions(-) create mode 100644 graphreduce/common.py diff --git a/graphreduce/common.py b/graphreduce/common.py new file mode 100644 index 0000000..6051cfb --- /dev/null +++ b/graphreduce/common.py @@ -0,0 +1,100 @@ +#!/usr/bin/env python + +import pytz +from datetime import datetime +import pandas as pd +import dask.dataframe as dd +from pyspark.sql import functions as F +import pyspark +from torch_frame import stype + + +stype_map = { + 'numerical': [ + 'min', + 'max', + 'median', + 'mean', + 'sum', + ], + 'categorical': [ + 'nunique', + 'count', + 'mode', + ], + 'text_embedded': [ + 'length' + ], + 'text_tokenized': [ + 'length' + ], + 'multicategorical': [ + 'length' + ], + 'sequence_numerical': [ + 'sum', + 'min', + 'max', + 'median', + ], + 'timestamp': [ + 'min', + 'max', + 'delta' + ], + 'image_embedded': [], + 'embedding': [] +} + + +def clean_datetime_pandas(df: pd.DataFrame, col: str) -> pd.DataFrame: + df[col] = pd.to_datetime(df[col], errors="coerce", utc=True) + + # Count the number of rows before removing invalid dates + total_before = len(df) + + # Remove rows where timestamp is NaT (indicating parsing failure) + df = df.dropna(subset=[col]) + + # Count the number of rows after removing invalid dates + total_after = len(df) + + # Calculate the percentage of rows removed + percentage_removed = ((total_before - total_after) / total_before) * 100 + + # Print the percentage of comments removed + print( + f"Percentage of rows removed due to invalid dates: " + f"{percentage_removed:.2f}%" + ) + return df + + +def clean_datetime_dask(df: dd.DataFrame, col: str) -> dd.DataFrame: + df[col] = dd.to_datetime(df[col]) + total_before = len(df) + df = df.dropna(subset=[col]) + total_after = len(df) + percentage_removed = ((total_before - total_after) / total_before) * 100 + return df + + +def clean_datetime_spark(df, col: str) -> pyspark.sql.DataFrame: + pass + + + +def convert_to_utc(dt): + """Converts a datetime object to UTC. + + Args: + dt: The datetime object to convert. + + Returns: + The datetime object converted to UTC. + """ + if dt.tzinfo is None: # Naive datetime + # Assuming the original timezone is the local system time + local_tz = pytz.timezone('US/Pacific') # Replace with the actual timezone if known + dt = local_tz.localize(dt) + return dt.astimezone(pytz.UTC) diff --git a/graphreduce/graph_reduce.py b/graphreduce/graph_reduce.py index e0c4727..1590bc9 100644 --- a/graphreduce/graph_reduce.py +++ b/graphreduce/graph_reduce.py @@ -38,18 +38,28 @@ def __init__( auto_feature_hops_back: int = 2, auto_feature_hops_front: int = 1, feature_typefunc_map : typing.Dict[str, typing.List[str]] = { - 'int64' : ['count'], + 'int64' : ['median', 'mean', 'sum', 'min', 'max'], 'str' : ['min', 'max', 'count'], #'object' : ['first', 'count'], 'object': ['count'], - 'float64' : ['min', 'max', 'sum'], + 'float64' : ['median', 'min', 'max', 'sum', 'mean'], + 'float32': ['median','min','max','sum','mean'], #'bool' : ['first'], #'datetime64' : ['first', 'min', 'max'], 'datetime64': ['min', 'max'], 'datetime64[ns]': ['min', 'max'], }, + feature_stype_map: typing.Dict[str, typing.List[str]] = { + 'numerical': ['median', 'mean', 'sum', 'min', 'max'], + 'categorical': ['count', 'nunique'], + 'embedding': ['first'], + 'image_embedded': ['first'], + 'multicategorical': ['mode'], + 'sequence_numerical': ['min', 'max'], + 'timestamp': ['min','max'] + }, # Label parameters. - label_node: typing.Optional[GraphReduceNode] = None, + label_node: typing.Optional[typing.Union[GraphReduceNode, typing.List[GraphReduceNode]]] = None, label_operation: typing.Optional[typing.Union[callable, str]] = None, # Field on the node. label_field: typing.Optional[str] = None, @@ -113,6 +123,7 @@ def __init__( self.auto_feature_hops_back = auto_feature_hops_back self.auto_feature_hops_front = auto_feature_hops_front self.feature_typefunc_map = feature_typefunc_map + self.feature_stype_map = feature_stype_map # SQL dialect parameters. self._lazy_execution = lazy_execution @@ -216,6 +227,8 @@ def hydrate_graph_data ( Hydrate the nodes in the graph with their data """ for node in self.nodes(): + if self.debug: + logger.debug(f'hydrating {node} data') node.do_data() @@ -227,11 +240,14 @@ def add_entity_edge ( relation_key : str, # need to enforce this better relation_type : str = 'parent_child', - reduce : bool = True + reduce : bool = True, + reduce_after_join: bool = False, ): """ Add an entity relation """ + if reduce and reduce_after_join: + raise Exception(f'only one can be true: `reduce` or `reduce_after_join`') if not self.has_edge(parent_node, relation_node): self.add_edge( parent_node, @@ -240,7 +256,8 @@ def add_entity_edge ( 'parent_key' : parent_key, 'relation_key' : relation_key, 'relation_type' : relation_type, - 'reduce' : reduce + 'reduce' : reduce, + 'reduce_after_join': reduce_after_join } ) @@ -610,7 +627,8 @@ def do_transformations_sql(self): sql_ops = relation_node.auto_features( reduce_key=edge_data['relation_key'], - type_func_map=self.feature_typefunc_map, + #type_func_map=self.feature_typefunc_map, + type_func_map=self.feature_stype_map, compute_layer=self.compute_layer ) logger.info(f"{sql_ops}") @@ -619,7 +637,8 @@ def do_transformations_sql(self): relation_node.build_query( relation_node.auto_features( reduce_key=edge_data['relation_key'], - type_func_map=self.feature_typefunc_map, + #type_func_map=self.feature_typefunc_map, + type_func_map=self.feature_stype_map, compute_layer=self.compute_layer ) ), @@ -634,8 +653,6 @@ def do_transformations_sql(self): reduce_sql = relation_node.build_query(reduce_ops) logger.info(f"reduce SQL: {reduce_sql}") reduce_ref = relation_node.create_ref(reduce_sql, relation_node.do_reduce) - - else: # in this case we will join the entire relation's dataframe logger.info(f"doing nothing with relation node {relation_node}") @@ -649,13 +666,12 @@ def do_transformations_sql(self): ) # Target variables. - if self.label_node and self.label_node == relation_node: + if self.label_node and (self.label_node == relation_node or relation_node.label_field is not None): logger.info(f"Had label node {self.label_node}") # Get the reference right before `do_reduce` # so the records are not aggregated yet. - data_ref = relation_node.get_ref_name(relation_node.do_filters, lookup=True) - + data_ref = relation_node.get_ref_name(relation_node.do_filters, lookup=True) #TODO: don't need to reduce if it's 1:1 cardinality. if self.auto_features: @@ -741,7 +757,7 @@ def do_transformations(self): # This may be an incorrect assumption # so this implementation is currently brittle. if self.debug: - logger.debug(f'Performing an auto_features front join from {from_node} to {to_node}') + logger.debug(f'Performing FRONT auto_features front join from {from_node} to {to_node}') joined_df = self.join_any( to_node, from_node @@ -761,10 +777,11 @@ def do_transformations(self): join_df = relation_node.do_reduce(edge_data['relation_key']) # only relevant when reducing if self.auto_features: - logger.info(f"performing auto_features on node {relation_node}") + logger.info(f"performing auto_features on node {relation_node} with reduce key {edge_data['relation_key']}") child_df = relation_node.auto_features( reduce_key=edge_data['relation_key'], - type_func_map=self.feature_typefunc_map, + #type_func_map=self.feature_typefunc_map, + type_func_map=self.feature_stype_map, compute_layer=self.compute_layer ) @@ -779,6 +796,8 @@ def do_transformations(self): ) else: join_df = child_df + if self.debug: + logger.debug(f'assigned join_df to be {child_df.columns}') elif self.compute_layer == ComputeLayerEnum.spark: if isinstance(join_df, pyspark.sql.dataframe.DataFrame): join_df = join_df.join( @@ -788,6 +807,8 @@ def do_transformations(self): ) else: join_df = child_df + if self.debug: + logger.debug(f'assigned join_df to be {child_df.columns}') else: # in this case we will join the entire relation's dataframe @@ -805,14 +826,24 @@ def do_transformations(self): parent_node.df = joined_df # Target variables. - if self.label_node and self.label_node == relation_node: + if self.label_node and (self.label_node == relation_node or relation_node.label_field is not None): logger.info(f"Had label node {self.label_node}") + # Automatic label generation. if isinstance(relation_node, DynamicNode): - label_df = relation_node.default_label( + if self.label_node == relation_node: + label_df = relation_node.default_label( op=self.label_operation, field=self.label_field, reduce_key=edge_data['relation_key'] - ) + ) + elif relation_node.label_field is not None: + label_df = relation_node.default_label( + op=relation_node.label_operation if relation_node.label_operation else 'count', + field=relation_node.label_field, + reduce_key=edge_data['relation_key'] + ) + # There should be an implementation of `do_labels` + # when the instance is a `GraphReduceNode`. elif isinstance(relation_node, GraphReduceNode): label_df = relation_node.do_labels(edge_data['relation_key']) @@ -829,4 +860,8 @@ def do_transformations(self): parent_node.do_post_join_annotate() # post-join filters (if any) if hasattr(parent_node, 'do_post_join_filters'): - parent_node.do_post_join_filters() + parent_node.do_post_join_filters() + + # post-join aggregation + if edge_data['reduce_after_join']: + parent_node.do_post_join_reduce(edge_data['relation_key'], type_func_map=self.feature_stype_map) diff --git a/graphreduce/node.py b/graphreduce/node.py index a40c58b..eeca994 100644 --- a/graphreduce/node.py +++ b/graphreduce/node.py @@ -14,11 +14,18 @@ import pyspark from structlog import get_logger from dateutil.parser import parse as date_parse +from torch_frame import stype +from torch_frame.utils import infer_df_stype # internal from graphreduce.enum import ComputeLayerEnum, PeriodUnit, SQLOpType from graphreduce.storage import StorageClient from graphreduce.models import sqlop +from graphreduce.common import ( + clean_datetime_pandas, + clean_datetime_dask, + clean_datetime_spark +) logger = get_logger('Node') @@ -78,6 +85,7 @@ def __init__ ( label_period_val : typing.Optional[typing.Union[int, float]] = None, label_period_unit : typing.Optional[PeriodUnit] = None, label_field : typing.Optional[str] = None, + label_operation: typing.Optional[typing.Union[str, callable]] = None, spark_sqlctx : pyspark.sql.SQLContext = None, columns : list = [], storage_client: typing.Optional[StorageClient] = None, @@ -87,6 +95,7 @@ def __init__ ( # Read encoding. delimiter: str = None, encoding: str = None, + ts_data: bool = False, ): """ Constructor @@ -111,6 +120,7 @@ def __init__ ( self.label_period_val = label_period_val self.label_period_unit = label_period_unit self.label_field = label_field + self.label_operation = label_operation self.spark_sqlctx = spark_sqlctx self.columns = columns @@ -124,10 +134,12 @@ def __init__ ( # List of merged neighbor classes. self._merged = [] # List of checkpoints. - + # Logical types of the original columns from `woodwork`. self._logical_types = {} + self._stypes = {} + if not self.date_key: logger.warning(f"no `date_key` set for {self}") @@ -138,7 +150,7 @@ def __repr__ ( """ Instance representation """ - return f"" + return f"" def __str__ ( self @@ -146,7 +158,46 @@ def __str__ ( """ Instances string """ - return f"" + return f"" + + + def _is_identifier ( + self, + col: str + ) -> bool: + """ +Check if a column is an identifier. + """ + if col.lower() == 'id': + return True + elif col.lower().split('_')[-1].endswith('id'): + return True + elif col.lower() == 'uuid': + return True + elif col.lower() == 'guid': + return True + elif col.lower() == 'identifier': + return True + + + def is_ts_data ( + self, + reduce_key: str = None, + ) -> bool: + """ +Determines if the data is timeseries. + """ + if self.date_key: + if self.compute_layer == ComputeLayerEnum.pandas or self.compute_layer == ComputeLayerEnum.dask: + grouped = self.df.groupby(self.colabbr(reduce_key)).agg({self.colabbr(self.pk):'count'}) + if len(grouped) / len(self.df) < 0.9: + return True + elif self.compute_layer == ComputeLayerEnum.spark: + grouped = self.df.groupBy(self.colabbr(reduce_key)).agg(F.count(self.colabbr(self.pk))).count() + n = self.df.count() + if float(grouped) / float(n) < 0.9: + return True + return False def reload ( @@ -177,8 +228,7 @@ def do_data ( if self.encoding and self.delimiter: self.df = getattr(pd, f"read_{self.fmt}")(self.fpath, encoding=self.encoding, delimiter=self.delimiter) else: - self.df = getattr(pd, f"read_{self.fmt}")(self.fpath) - + self.df = getattr(pd, f"read_{self.fmt}")(self.fpath) # Initialize woodwork. #self.df.ww.init() #self._logical_types = self.df.ww.logical_types @@ -188,11 +238,12 @@ def do_data ( self.df = self.df[[c for c in self.columns]] self.columns = list(self.df.columns) self.df.columns = [f"{self.prefix}_{c}" for c in self.df.columns] + # Infer the semantic type with `torch_frame`. + self._stypes = infer_df_stype(self.df.head(100)) elif self.compute_layer.value == 'dask': if not hasattr(self, 'df') or (hasattr(self, 'df') and not isinstance(self.df, dd.DataFrame )): self.df = getattr(dd, f"read_{self.fmt}")(self.fpath) - # Initialize woodwork. #self.df.ww.init() #self._logical_types = self.df.ww.logical_types @@ -202,6 +253,8 @@ def do_data ( self.df = self.df[[c for c in self.columns]] self.columns = list(self.df.columns) self.df.columns = [f"{self.prefix}_{c}" for c in self.df.columns] + # Infer the semantic type with `torch_frame`. + self._stypes = infer_df_stype(self.df.head()) elif self.compute_layer.value == 'spark': if not hasattr(self, 'df') or (hasattr(self, 'df') and not isinstance(self.df, pyspark.sql.DataFrame)): if self.dialect == 'python': @@ -214,6 +267,8 @@ def do_data ( for c in self.df.columns: self.df = self.df.withColumnRenamed(c, f"{self.prefix}_{c}") + # Infer the semantic type with `torch_frame`. + self._stypes = infer_df_stype(self.df.head(100).toPandas()) # at this point of connectors we may want to try integrating # with something like fugue: https://github.com/fugue-project/fugue elif self.compute_layer.value == 'ray': @@ -271,6 +326,14 @@ def do_post_join_filters(self): pass + def do_post_join_reduce(self, reduce_key:str): + """ +Implementation for reduce operations +after a join. + """ + pass + + def auto_features ( self, reduce_key : str, @@ -331,15 +394,79 @@ def pandas_auto_features ( definitions. """ agg_funcs = {} - for col, _type in dict(self.df.dtypes).items(): - _type = str(_type) - if type_func_map.get(_type): + + ts_data = self.is_ts_data(reduce_key) + if ts_data: + # Make sure the dates are cleaned. + self.df = clean_datetime_pandas(self.df, self.colabbr(self.date_key)) + # First sort the data by dates. + self.df = self.df.sort_values(self.colabbr(self.date_key), ascending=True) + self.df[f'prev_{self.colabbr(self.date_key)}'] = self.df.groupby(self.colabbr(reduce_key))[self.colabbr(self.date_key)].shift(1) + # Get the time between the two different records. + self.df[self.colabbr('time_between_records')] = self.df.apply( + lambda x: (x[self.colabbr(self.date_key)]-x[f'prev_{self.colabbr(self.date_key)}']).total_seconds(), + axis=1) + + for col, stype in self._stypes.items(): + _type = str(stype) + if self._is_identifier(col) and col != reduce_key: + # We only perform counts for identifiers. + agg_funcs[f'{col}_count'] = pd.NamedAgg(column=col, aggfunc='count') + elif self._is_identifier(col) and col == reduce_key: + continue + elif type_func_map.get(_type): for func in type_func_map[_type]: + if (_type == 'numerical' or 'timestamp') and dict(self.df.dtypes)[col].__str__() == 'object': + logger.info(f'skipped aggregation on {col} because semantic numerical but physical object') + continue col_new = f"{col}_{func}" agg_funcs[col_new] = pd.NamedAgg(column=col, aggfunc=func) - return self.prep_for_features().groupby(self.colabbr(reduce_key)).agg( + if not len(agg_funcs): + logger.info(f'No aggregations for {self}') + return self.df + + grouped = self.prep_for_features().groupby(self.colabbr(reduce_key)).agg( **agg_funcs ).reset_index() + if not len(grouped): + return None + # If we have time-series data take the time + # since the last event and the cut date. + if ts_data: + logger.info(f'computed post-aggregation features for {self}') + def is_tz_aware(series): + return series.dt.tz is not None + if is_tz_aware(grouped[f'{self.colabbr(self.date_key)}_max']): + grouped[f'{self.colabbr(self.date_key)}_max'] = grouped[f'{self.colabbr(self.date_key)}_max'].dt.tz_localize(None) + + grouped[self.colabbr('time_since_last_event')] = grouped.apply(lambda x: + (self.cut_date - x[f'{self.colabbr(self.date_key)}_max']).total_seconds(), + axis=1) + + # Number of events in last strata of time + days = [30, 60, 90, 365, 730] + for d in days: + if d > self.compute_period_val: + continue + feat_prepped = self.prep_for_features() + if is_tz_aware(feat_prepped[self.colabbr(self.date_key)]): + feat_prepped[self.colabbr(self.date_key)] = feat_prepped[self.colabbr(self.date_key)].dt.tz_localize(None) + + feat_prepped[self.colabbr('time_since_cut')] = feat_prepped.apply( + lambda x: (self.cut_date - x[self.colabbr(self.date_key)]).total_seconds()/86400, + axis=1) + sub = feat_prepped[ + (feat_prepped[self.colabbr('time_since_cut')] >= 0) + & + (feat_prepped[self.colabbr('time_since_cut')] <= d) + ] + days_group = sub.groupby(self.colabbr(reduce_key)).agg(**{ + self.colabbr(f'{d}d_num_events'): pd.NamedAgg(aggfunc='count', column=self.colabbr(self.pk)) + }).reset_index() + # join this back to the main dataset. + grouped = grouped.merge(days_group, on=self.colabbr(reduce_key), how='left') + logger.info(f'merged all ts groupings to {self}') + return grouped def dask_auto_features ( @@ -467,7 +594,7 @@ def pandas_auto_labels ( """ agg_funcs = {} for col, _type in dict(self.df.dtypes).items(): - if col.endswith('_label'): + if col.endswith('_label') or col == self.label_field or col == f'{self.colabbr(self.label_field)}': _type = str(_type) if type_func_map.get(_type): for func in type_func_map[_type]: @@ -655,7 +782,6 @@ def prep_for_features ( | (self.df[self.colabbr(self.date_key)].isNull()) ) - # SQL engine. elif not hasattr(self, 'df'): return None @@ -712,7 +838,6 @@ def prep_for_labels ( return self.df - def default_label ( self, op: typing.Union[str, callable], @@ -785,7 +910,6 @@ def on_demand_features ( - class DynamicNode(GraphReduceNode): """ A dynamic architecture for entities with no logic @@ -830,9 +954,6 @@ def do_labels(self, reduce_key: str): pass - - - class GraphReduceQueryException(Exception): pass @@ -928,7 +1049,6 @@ def create_ref ( based on the method being called. """ - # No reference has been created for this method. fn = fn if isinstance(fn, str) else fn.__name__ @@ -993,8 +1113,7 @@ def get_sample ( """ Gets a sample of rows for the current table or a parameterized table. - """ - + """ samp_query = """ SELECT * FROM {table} @@ -1161,7 +1280,7 @@ def do_filters(self) -> typing.Union[sqlop, typing.List[sqlop]]: #] return None - + # Returns aggregate functions # Returns aggregate def do_reduce(self, reduce_key) -> typing.Union[sqlop, typing.List[sqlop]]: