Skip to content

Commit

Permalink
update readme
Browse files Browse the repository at this point in the history
  • Loading branch information
wesmadrigal committed Jul 8, 2024
1 parent 66c8ff4 commit bbeeb9d
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 101 deletions.
202 changes: 103 additions & 99 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,25 @@ there isn't a nice ML-ready vector waiting for us, so we must curate
the data by joining many tables together to flatten them into a vector.
This is the problem `graphreduce` sets out to solve.

## Prior work
* [Deep Feature Synthesis](https://www.maxkanter.com/papers/DSAA_DSM_2015.pdf
)
* [One Button Machine (IBM)](One Button Machine (IBM))
* [autofeat (BASF)](http://arxiv.org/pdf/1901.07329)
* [featuretools (inspired by Deep Feature Synthesis)](https://github.com/alteryx/featuretools)

## Shortcomings of prior work
* point in time correctness is not always handled well
* Deep Feature Synthesis and `featuretools` are limited to `pandas` and a couple of SQL databases
* One Button Machine from IBM uses `spark` but their implementation outside of the paper could not be found
* none of the prior implementations allow for custom computational graphs or additional third party libraries

## We extend prior works and add the following functionality:
* point in time correctness on arbitrarily large computational graphs
* extensible computational layers, with support currently spanning: `pandas`, `dask`, `spark`, AWS Athena, AWS Redshift, Snowflake, postgresql, mysql, and more coming
* customizable node implementations for a mix of dynamic and custom feature engineering with the ability to use third party libraries for portions (e.g., [cleanlab](https://github.com/cleanlab/cleanlab) for cleaning)


An example dataset might look like the following:

![schema](https://github.com/wesmadrigal/graphreduce/blob/master/docs/graph_reduce_example.png?raw=true)
Expand All @@ -50,118 +69,103 @@ An example dataset might look like the following:
1. Define the node-level interface and operations
```python
import datetime
from graphreduce.node import GraphReduceNode
import pandas as pd
from graphreduce.node import GraphReduceNode, DynamicNode
from graphreduce.enum import ComputeLayerEnum, PeriodUnit
from graphreduce.graph_reduce import GraphReduce

# Convention over configuration requires that we
# define boilerplate code for every entity / node
# we will compute over.
class CustomerNode(GraphReduceNode):
def do_annotate(self):
pass

def do_filters(self):
# Apply a filter operation on a hypothetical column `is_fake`.
# The `colabbr` method makes sure to prefix the column with
# the class or instance prefix.
self.df = self.df[self.df[self.colabbr('is_fake')] == False]

def do_normalize(self):
pass

def do_post_join_annotate(self):
pass

def do_reduce(self, reduce_key, *args, **kwargs):
pass

def do_labels(self, reduce_key, *args, **kwargs):
pass


class OrderNode(GraphReduceNode):
def do_annotate(self):
pass

def do_filters(self):
pass

def do_normalize(self):
pass

def do_post_join_annotate(self):
pass

def do_reduce(self, reduce_key):
# The `prep_for_features` method ensures no leakage
# prior to the compute period or after the cut date.
return self.prep_for_features().groupby(self.colabbr(reduce_key)).agg(
**{
self.colabbr(f'{self.pk}_count') : pd.NamedAgg(column=self.colabbr(self.pk), aggfunc='count')
}
).reset_index()

def do_labels(self, key):
pass
```

2. Instantiate the nodes and define the graph
```python
cust = CustomerNode(pk='id', prefix='cust',fpath='dat/cust.csv', fmt='csv', compute_layer=ComputeLayerEnum.pandas)
order = OrderNode(pk='id', prefix='order', fpath='dat/orders.csv', fmt='csv',compute_layer=ComputeLayerEnum.pandas)

# source from a csv file with the relationships
# using the file at: https://github.com/wesmadrigal/GraphReduce/blob/master/examples/cust_graph_labels.csv
reldf = pd.read_csv('cust_graph_labels.csv')

# using the data from: https://github.com/wesmadrigal/GraphReduce/tree/master/tests/data/cust_data
files = {
'cust.csv' : {'prefix':'cu'},
'orders.csv':{'prefix':'ord'},
'order_products.csv': {'prefix':'op'},
'notifications.csv':{'prefix':'notif'},
'notification_interactions.csv':{'prefix':'ni'},
'notification_interaction_types.csv':{'prefix':'nit'}

}
# create graph reduce nodes
gr_nodes = {
f.split('/')[-1]: DynamicNode(
fpath=f,
fmt='csv',
pk='id',
prefix=files[f]['prefix'],
date_key=None,
compute_layer=GraphReduceComputeLayerEnum.pandas,
compute_period_val=730,
compute_period_unit=PeriodUnit.day,
)
for f in files.keys()
}
gr = GraphReduce(
cut_date=datetime.datetime(2023, 5, 6),
compute_period_val=365,
compute_period_unit=PeriodUnit.day,
parent_node=cust,
compute_layer=ComputeLayerEnum.pandas,
has_labels=False,
label_period_val=30,
label_period_unit=PeriodUnit.day,
dynamic_propagation=True
)

# Add nodes and edges to the graph
gr.add_node(cust)
gr.add_node(order)

gr.add_entity_edge(
parent_node=cust,
relation_node=order,
parent_key='id',
relation_key='customer_id',
relation_type='parent_child',
reduce=True
name='cust_dynamic_graph',
parent_node=gr_nodes['cust.csv'],
fmt='csv',
cut_date=datetime.datetime(2023,9,1),
compute_layer=GraphReduceComputeLayerEnum.pandas,
auto_features=True,
auto_feature_hops_front=1,
auto_feature_hops_back=2,
label_node=gr_nodes['orders.csv'],
label_operation='count',
label_field='id',
label_period_val=60,
label_period_unit=PeriodUnit.day
)
gr.do_transformations()
2024-04-23 13:49:41 [info ] hydrating graph attributes
2024-04-23 13:49:41 [info ] hydrating attributes for DynamicNode
2024-04-23 13:49:41 [info ] hydrating attributes for DynamicNode
2024-04-23 13:49:41 [info ] hydrating attributes for DynamicNode
2024-04-23 13:49:41 [info ] hydrating attributes for DynamicNode
2024-04-23 13:49:41 [info ] hydrating attributes for DynamicNode
2024-04-23 13:49:41 [info ] hydrating attributes for DynamicNode
2024-04-23 13:49:41 [info ] hydrating graph data
2024-04-23 13:49:41 [info ] checking for prefix uniqueness
2024-04-23 13:49:41 [info ] running filters, normalize, and annotations for <GraphReduceNode: fpath=notification_interaction_types.csv fmt=csv>
2024-04-23 13:49:41 [info ] running filters, normalize, and annotations for <GraphReduceNode: fpath=notification_interactions.csv fmt=csv>
2024-04-23 13:49:41 [info ] running filters, normalize, and annotations for <GraphReduceNode: fpath=notifications.csv fmt=csv>
2024-04-23 13:49:41 [info ] running filters, normalize, and annotations for <GraphReduceNode: fpath=orders.csv fmt=csv>
2024-04-23 13:49:41 [info ] running filters, normalize, and annotations for <GraphReduceNode: fpath=order_products.csv fmt=csv>
2024-04-23 13:49:41 [info ] running filters, normalize, and annotations for <GraphReduceNode: fpath=cust.csv fmt=csv>
2024-04-23 13:49:41 [info ] depth-first traversal through the graph from source: <GraphReduceNode: fpath=cust.csv fmt=csv>
2024-04-23 13:49:41 [info ] reducing relation <GraphReduceNode: fpath=notification_interactions.csv fmt=csv>
2024-04-23 13:49:41 [info ] performing auto_features on node <GraphReduceNode: fpath=notification_interactions.csv fmt=csv>
2024-04-23 13:49:41 [info ] joining <GraphReduceNode: fpath=notification_interactions.csv fmt=csv> to <GraphReduceNode: fpath=notifications.csv fmt=csv>
2024-04-23 13:49:41 [info ] reducing relation <GraphReduceNode: fpath=notifications.csv fmt=csv>
2024-04-23 13:49:41 [info ] performing auto_features on node <GraphReduceNode: fpath=notifications.csv fmt=csv>
2024-04-23 13:49:41 [info ] joining <GraphReduceNode: fpath=notifications.csv fmt=csv> to <GraphReduceNode: fpath=cust.csv fmt=csv>
2024-04-23 13:49:41 [info ] reducing relation <GraphReduceNode: fpath=order_products.csv fmt=csv>
2024-04-23 13:49:41 [info ] performing auto_features on node <GraphReduceNode: fpath=order_products.csv fmt=csv>
2024-04-23 13:49:41 [info ] joining <GraphReduceNode: fpath=order_products.csv fmt=csv> to <GraphReduceNode: fpath=orders.csv fmt=csv>
2024-04-23 13:49:41 [info ] reducing relation <GraphReduceNode: fpath=orders.csv fmt=csv>
2024-04-23 13:49:41 [info ] performing auto_features on node <GraphReduceNode: fpath=orders.csv fmt=csv>
2024-04-23 13:49:41 [info ] joining <GraphReduceNode: fpath=orders.csv fmt=csv> to <GraphReduceNode: fpath=cust.csv fmt=csv>
2024-04-23 13:49:41 [info ] Had label node <GraphReduceNode: fpath=orders.csv fmt=csv>
2024-04-23 13:49:41 [info ] computed labels for <GraphReduceNode: fpath=orders.csv fmt=csv>

gr.parent_node.df
cu_id cu_name notif_customer_id notif_id_count notif_customer_id_count notif_ts_first notif_ts_min notif_ts_max ni_notification_id_min ni_notification_id_max ni_notification_id_sum ni_id_count_min ni_id_count_max ni_id_count_sum ni_notification_id_count_min ni_notification_id_count_max ni_notification_id_count_sum ni_interaction_type_id_count_min ni_interaction_type_id_count_max ni_interaction_type_id_count_sum ni_ts_first_first ni_ts_first_min ni_ts_first_max ni_ts_min_first ni_ts_min_min ni_ts_min_max ni_ts_max_first ni_ts_max_min ni_ts_max_max ord_customer_id ord_id_count ord_customer_id_count ord_ts_first ord_ts_min ord_ts_max op_order_id_min op_order_id_max op_order_id_sum op_id_count_min op_id_count_max op_id_count_sum op_order_id_count_min op_order_id_count_max op_order_id_count_sum op_product_id_count_min op_product_id_count_max op_product_id_count_sum ord_customer_id_dupe ord_id_label
0 1 wes 1 6 6 2022-08-05 2022-08-05 2023-06-23 101.0 106.0 621.0 1.0 3.0 14.0 1.0 3.0 14.0 1.0 3.0 14.0 2022-08-06 2022-08-06 2023-05-15 2022-08-06 2022-08-06 2023-05-15 2022-08-08 2022-08-08 2023-05-15 1.0 2.0 2.0 2023-05-12 2023-05-12 2023-06-01 1.0 2.0 3.0 4.0 4.0 8.0 4.0 4.0 8.0 4.0 4.0 8.0 1.0 1.0
1 2 john 2 7 7 2022-09-05 2022-09-05 2023-05-22 107.0 110.0 434.0 1.0 1.0 4.0 1.0 1.0 4.0 1.0 1.0 4.0 2023-06-01 2023-06-01 2023-06-04 2023-06-01 2023-06-01 2023-06-04 2023-06-01 2023-06-01 2023-06-04 2.0 1.0 1.0 2023-01-01 2023-01-01 2023-01-01 3.0 3.0 3.0 4.0 4.0 4.0 4.0 4.0 4.0 4.0 4.0 4.0 NaN NaN
2 3 ryan 3 2 2 2023-06-12 2023-06-12 2023-09-01 NaN NaN 0.0 NaN NaN 0.0 NaN NaN 0.0 NaN NaN 0.0 NaT NaT NaT NaT NaT NaT NaT NaT NaT 3.0 1.0 1.0 2023-06-01 2023-06-01 2023-06-01 5.0 5.0 5.0 1.0 1.0 1.0 1.0 1.0 1.0 1.0 1.0 1.0 NaN NaN
3 4 tianji 4 2 2 2024-02-01 2024-02-01 2024-02-15 NaN NaN 0.0 NaN NaN 0.0 NaN NaN 0.0 NaN NaN 0.0
```

3. Plot the graph reduce compute graph.
2. Plot the graph reduce compute graph.
```python
gr.plot_graph('my_graph_reduce.html')
```

4. Run compute operations
```python
gr.do_transformations()

2023-08-03 09:05:44 [info ] hydrating graph attributes
2023-08-03 09:05:44 [info ] hydrating attributes for CustomerNode
2023-08-03 09:05:44 [info ] hydrating attributes for OrderNode
2023-08-03 09:05:44 [info ] hydrating graph data
2023-08-03 09:05:44 [info ] checking for prefix uniqueness
2023-08-03 09:05:44 [info ] running filters, normalize, and annotations for CustomerNode
2023-08-03 09:05:44 [info ] running filters, normalize, and annotations for OrderNode
2023-08-03 09:05:44 [info ] depth-first traversal through the graph from source: CustomerNode
2023-08-03 09:05:44 [info ] reducing relation OrderNode
2023-08-03 09:05:44 [info ] doing dynamic propagation on node OrderNode
2023-08-03 09:05:44 [info ] joining OrderNode to CustomerNode
```

5. Use materialized dataframe for ML / analytics
3. Use materialized dataframe for ML / analytics
```python
gr.df.head()
gr.parent_node.df.head()

cust_id cust_name order_customer_id order_id_count order_id_min order_id_max order_id_sum order_customer_id_min order_customer_id_max order_customer_id_sum order_ts_first
0 1 wes 1 2 1 2 3 1 1 2 2023-05-12
Expand Down
4 changes: 2 additions & 2 deletions graphreduce/cli/auto_fe.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ def autofe_filesystem (
),
cut_date: str = Argument(str(datetime.datetime.today())),
compute_layer: str = Argument("pandas"),
hops_front: int = Argument(1),
hops_back: int = Argument(3),
hops_front: int = Option(1, '-hf', '--hops-front', help='number of front hops to peform'),
hops_back: int = Option(3, '-hb', '--hops-back', help='number of back hops to perform'),
output_path: str = Option('-op', '--output-path', help='output path for the data')
):
"""
Expand Down

0 comments on commit bbeeb9d

Please sign in to comment.