Skip to content

Commit

Permalink
support for sql loading of spark dfs
Browse files Browse the repository at this point in the history
  • Loading branch information
wesmadrigal committed Aug 31, 2024
1 parent e184220 commit 6154388
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 43 deletions.
4 changes: 3 additions & 1 deletion docs/abstractions.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,12 @@ gr.add_entity_edge(
parent_key='id',
relation_key='customer_id',
reduce=True
)
```

The `reduce` parameter tells `graphreduce` whether or not to execute
aggregation operations.
aggregation operations. In some cases a user may want to maintain
an aggregation operation but avoid executing it for a particular compute graph.


## GraphReduce Graph
Expand Down
56 changes: 56 additions & 0 deletions docs/tutorial_method_deps.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Post-join method execution
There is a method decorator for specifying that
a node instance method can only be executed
once other nodes have been joined to the node.

For example, using the `cust.csv` and `orders.csv`
datasets from prior examples, let's say we want
to compute the difference between an order date
and the signup date of the customer. In this
case we need the timestamp of the order and the
timestamp of the customer.

```Python
# In the node definition of the customer
# specify this dependency
from graphreduce.node import GraphReduceNode
from graphreduce.context import method_requires

class CustomerNode(GraphReduceNode):
def do_filters(self):
pass
...
...

@method_requires(nodes=[OrderNode])
def do_post_join_annotate(self):
self.df[self.colabbr('signup_order_diff_seconds')] = self.df.apply(
lambda x: (x['ord_ts'] - x[self.colabbr('signup_date')]).total_seconds(),
axis=1
)
```

By using the `method_requires` decorator we've told graphreduce
that the `CustomerNode.do_post_join_annotate` method can only
be executed once the `OrderNode` has been joined.

There are cases where we need multiple child nodes' data merged
to a parent node before certain operations can be executed, such
as annotations and filters.

This would look like the following snippet:
```Python

class ParentNode(GraphReduceNode):
...

@method_requies(nodes=[ChildNodeOne, ChildNodeThree])
def do_post_join_filters(self):
self.df[self.colabbr('newcol')] = self.df.apply(lambda x: f"{x['col1']}-{x['child_col']}-{x['child3_col']}", axis=1)


@method_requires(nodes=[ChildNodeOne, ChildNodeTwo, ChildNodeThree])
def do_post_join_filters(self):
self.df = self.df[..]

```
72 changes: 34 additions & 38 deletions docs/tutorial_swapping_compute.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,52 +17,35 @@ and start with a `pandas` backend, which is specified
with the `compute_layer` parameter.

```
prefixes = {
'cust.csv' : {'prefix':'cu'},
'orders.csv':{'prefix':'ord'}
}
# create graph reduce nodes
gr_nodes = {
f.split('/')[-1]: DynamicNode(
fpath=f,
fmt='csv',
pk='id',
prefix=prefixes[f]['prefix'],
date_key=None,
compute_layer=ComputeLayerEnum.pandas,
compute_period_val=730,
compute_period_unit=PeriodUnit.day,
)
for f in prefixes.keys()
}
cust_node = DynamicNode(
fpath='cust.csv',
fmt='csv',
pk='id',
prefix='cu',
date_key=None,
compute_layer=ComputeLayerEnum.pandas
)
gr_nodes['cust.csv'].do_data()
type(gr_nodes['cust.csv'].df)
cust_node.do_data()
type(cust_node.df)
pandas.core.frame.DataFrame
```


## Dask
Now we can instantiate the same nodes with `dask`:
```Python
# create graph reduce nodes
gr_nodes = {
f.split('/')[-1]: DynamicNode(
fpath=f,
fmt='csv',
pk='id',
prefix=prefixes[f]['prefix'],
date_key=None,
compute_layer=ComputeLayerEnum.dask,
compute_period_val=730,
compute_period_unit=PeriodUnit.day,
)
for f in prefixes.keys()
}
cust_node = DynamicNode(
fpath='cust.csv',
fmt='csv',
pk='id',
prefix='cu',
date_key=None,
compute_layer=ComputeLayerEnum.dask
)

gr_nodes['cust.csv'].do_data()
type(gr_nodes['cust.csv'].df)
cust_node.do_data()
type(cust_node.df)
dask.dataframe.core.DataFrame
```

Expand All @@ -86,5 +69,18 @@ type(cloud_node.df)
pyspark.sql.dataframe.DataFrame
```

## SQL compute engines
To use SQL dialect we need to use the `SQLNode` class
and it's subclasses.
and it's subclasses. These are instantiated as follows:
```Python
from graphreduce.node import SQLNode

cust_node = SQLNode(
fpath='schema.customers',
fmt='sql',
pk='id',
prefix='cu',
date_key='signup_date',
compute_layer=ComputeLayerEnum.sql
)
```
10 changes: 7 additions & 3 deletions docs/tutorial_time.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,17 +63,21 @@ to October 1, 2023. There are exactly 4 records that satisfy that
criteria, so we can see the `prep_for_features` function is working as expected.


## Putting it all together
Additionally, for the labels we see there is 1 record within 30
days of October 1, 2023 so we can see the `prep_for_labels`
function is working as expected.
Using the example from before with `cust.csv` and `orders.csv`
let's say we want to only compute features within 6 months
and compute a label for 45 days.


## Top-level config

In the `GraphReduce` instance we specify `compute_period_val` and `label_period_val`.
These parameters control how much history is included during execution. For this
graph data from `2023/9/1` going back 180 days will be included in feature preparation
These parameters control how much history is included during execution. These parameters
will be pushed down through all the nodes in the graph, ensuring consistency across
all nodes.
In the below instance data from `2023/9/1` going back 180 days will be included in feature preparation
and data from `2023/9/1` going forward 45 days will be included in label preparation.


Expand Down
9 changes: 8 additions & 1 deletion graphreduce/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ def __init__ (
prefix : str = None,
date_key : str = None,
compute_layer : ComputeLayerEnum = None,
# 'python' or 'sql'
dialect: str = 'python',
cut_date : datetime.datetime = datetime.datetime.now(),
compute_period_val : typing.Union[int, float] = 365,
compute_period_unit : PeriodUnit = PeriodUnit.day,
Expand Down Expand Up @@ -98,6 +100,7 @@ def __init__ (
self.fpath = fpath
self.fmt = fmt
self.compute_layer = compute_layer
self.dialect = dialect
self.cut_date = cut_date
self.compute_period_val = compute_period_val
self.compute_period_unit = compute_period_unit
Expand Down Expand Up @@ -191,7 +194,11 @@ def do_data (
self.df.columns = [f"{self.prefix}_{c}" for c in self.df.columns]
elif self.compute_layer.value == 'spark':
if not hasattr(self, 'df') or (hasattr(self, 'df') and not isinstance(self.df, pyspark.sql.DataFrame)):
self.df = getattr(self.spark_sqlctx.read, f"{self.fmt}")(self.fpath)
if self.dialect == 'python':
self.df = getattr(self.spark_sqlctx.read, f"{self.fmt}")(self.fpath)
elif self.dialect == 'sql':
self.df = self.spark_sqlctx.sql(f"select * from {self.fpath}")

if self.columns:
self.df = self.df.select(self.columns)
for c in self.df.columns:
Expand Down
3 changes: 3 additions & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ nav:
- tutorial_custom_graph.md
- tutorial_swapping_compute.md
- tutorial_sql_dialects.md
- tutorial_method_deps.md
- Command line interface:
- tutorial_cli.md
#- tutorial_join_deps.md
#- tutorial_pandas_dask.md
#- tutorial_spark.md
Expand Down

0 comments on commit 6154388

Please sign in to comment.