Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release/1.4.1 #375

Merged
merged 75 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
75 commits
Select commit Hold shift + click to select a range
fc41ab9
[MLOP-634] Butterfree dev workflow, set triggers for branches staging…
moromimay Feb 8, 2021
4be4ffe
[BUG] Fix Staging GithubActions Pipeline (#283)
moromimay Feb 8, 2021
a3a601b
Apply only wheel. (#285)
moromimay Feb 8, 2021
4339608
[BUG] Change version on setup.py to PyPI (#286)
moromimay Feb 9, 2021
a82433c
Keep milliseconds when using 'from_ms' argument in timestamp feature …
hmeretti Feb 9, 2021
dcbf540
Change trigger for pipeline staging (#287)
moromimay Feb 10, 2021
a0a9335
Create a dev package. (#288)
moromimay Feb 10, 2021
7427898
[MLOP-633] Butterfree dev workflow, update documentation (#281)
moromimay Feb 10, 2021
245eaa5
[MLOP-632] Butterfree dev workflow, automate release description (#279)
AlvaroMarquesAndrade Feb 11, 2021
d6ecfa4
[MLOP-636] Create migration classes (#282)
AlvaroMarquesAndrade Feb 18, 2021
32e24d6
[MLOP-635] Rebase Incremental Job/Interval Run branch for test on sel…
moromimay Feb 19, 2021
8da89ed
Allow slide selection (#293)
roelschr Feb 22, 2021
0df07ae
Fix Slide Duration Typo (#295)
AlvaroMarquesAndrade Feb 26, 2021
aeb7999
[MLOP-637] Implement diff method (#292)
moromimay Mar 8, 2021
9afc39c
[MLOP-640] Create CLI with migrate command (#298)
roelschr Mar 15, 2021
bf204f2
[MLOP-645] Implement query method, cassandra (#291)
AlvaroMarquesAndrade Mar 15, 2021
b518dbc
[MLOP-671] Implement get_schema on Spark client (#301)
AlvaroMarquesAndrade Mar 16, 2021
5fe4c40
[MLOP-648] Implement query method, metastore (#294)
AlvaroMarquesAndrade Mar 16, 2021
e8fc0da
Fix Validation Step (#302)
AlvaroMarquesAndrade Mar 22, 2021
3d93a09
[MLOP-647] [MLOP-646] Apply migrations (#300)
roelschr Mar 23, 2021
0d30932
[BUG] Apply create_partitions to historical validate (#303)
moromimay Mar 30, 2021
d607297
[BUG] Fix key path for validate read (#304)
moromimay Mar 30, 2021
3dcd975
[FIX] Add Partition types for Metastore (#305)
AlvaroMarquesAndrade Apr 1, 2021
8077d86
[MLOP-639] Track logs in S3 (#306)
moromimay Apr 1, 2021
6d2a8f9
[BUG] Change logging config (#307)
moromimay Apr 6, 2021
d2c5d39
Change solution for tracking logs (#308)
moromimay Apr 8, 2021
43392f4
Read and write consistency level options (#309)
github-felipe-caputo Apr 13, 2021
0f31164
Fix kafka reader. (#310)
moromimay Apr 14, 2021
e6f67e9
Fix path validate. (#311)
moromimay Apr 14, 2021
baa594b
Add local dc property (#312)
github-felipe-caputo Apr 16, 2021
a74f098
Remove metastore migrate (#313)
moromimay Apr 20, 2021
378f3a5
Fix link in our docs. (#315)
moromimay Apr 20, 2021
3b18b5a
[BUG] Fix Cassandra Connect Session (#316)
moromimay Apr 23, 2021
c46f171
Fix migration query. (#318)
moromimay Apr 26, 2021
bb124f5
Fix migration query add type key. (#319)
moromimay Apr 28, 2021
1c97316
Fix db-config condition (#321)
moromimay May 5, 2021
bb7ed77
MLOP-642 Document migration in Butterfree (#320)
roelschr May 7, 2021
5a0a622
[MLOP-702] Debug mode for Automate Migration (#322)
moromimay May 10, 2021
b1371f1
[MLOP-727] Improve logging messages (#325)
GaBrandao Jun 2, 2021
acf7022
[MLOP-728] Improve logging messages (#324)
moromimay Jun 2, 2021
d0bf61a
Fix method to generate agg feature name. (#326)
moromimay Jun 4, 2021
1cf0dbd
[MLOP-691] Include step to add partition to SparkMetastore during wr…
moromimay Jun 10, 2021
9f42f53
Add the missing link for H3 geohash (#330)
jdvala Jun 16, 2021
78927e3
Update README.md (#331)
Jul 30, 2021
43bb3a3
Update Github Actions Workflow runner (#332)
Aug 22, 2022
2593839
Delete sphinx version. (#334)
moromimay Dec 20, 2022
35bcd30
Update files to staging (#336)
moromimay Dec 21, 2022
3a73ed8
Revert "Update files to staging (#336)" (#337)
moromimay Jan 2, 2023
6b78a50
Less strict requirements (#333)
lecardozo Aug 16, 2023
2a19009
feat: optional row count validation (#340)
ralphrass Aug 18, 2023
ca1a16d
fix: parameter, libs (#341)
ralphrass Aug 18, 2023
60c7ee4
pre-release 1.2.2.dev0 (#342)
ralphrass Aug 21, 2023
f35d665
Rebase staging (#343)
ralphrass Aug 21, 2023
97e44fa
Rebase staging from master (#345)
ralphrass Aug 21, 2023
9bcca0e
feat(MLOP-1985): optional params (#347)
ralphrass Nov 13, 2023
512a0fe
pre-release 1.2.3 (#349)
ralphrass Nov 13, 2023
688a5b3
feat(MLOP-2145): add feature set creation script (#351)
ralphrass Apr 11, 2024
da91b49
Rebase staging from master (#354)
ralphrass Apr 25, 2024
887fbb2
feat(mlop-2269): bump versions (#355)
ralphrass May 29, 2024
5af8a05
fix: sphinx version (#356)
ralphrass Jun 3, 2024
cbda73d
fix: publish and dev versions (#359)
ralphrass Jun 7, 2024
2a5a6e8
feat(MLOP-2236): add NTZ (#360)
ralphrass Jun 14, 2024
6363e03
fix: cassandra configs (#364)
ralphrass Jun 20, 2024
81c2c17
fix: Cassandra config keys (#366)
ralphrass Jun 28, 2024
b1949cd
fix: new type (#368)
ralphrass Jun 28, 2024
12d5e98
Delete .checklist.yaml (#371)
fernandrone Aug 16, 2024
35dd929
Add Delta support (#370)
ralphrass Aug 19, 2024
f6c5db6
Fix dup code (#373)
ralphrass Aug 21, 2024
11cc5d5
fix: performance improvements (#374)
ralphrass Sep 16, 2024
2b771aa
release 1.4.1
ralphrass Sep 16, 2024
ae47a54
Merge branch 'master' into release/1.4.1
ralphrass Sep 16, 2024
fdcb782
style
ralphrass Sep 16, 2024
3d52fa0
fix: var
ralphrass Sep 16, 2024
c4c1e5b
fix: test
ralphrass Sep 16, 2024
11a2973
fix: style
ralphrass Sep 16, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ Preferably use **Added**, **Changed**, **Removed** and **Fixed** topics in each

## [Unreleased]

## [1.4.1](https://github.com/quintoandar/butterfree/releases/tag/1.4.1)
* Performance Improvements ([#374](https://github.com/quintoandar/butterfree/pull/374))

## [1.4.0](https://github.com/quintoandar/butterfree/releases/tag/1.4.0)
* Add Delta support ([#370](https://github.com/quintoandar/butterfree/pull/370))

Expand Down
14 changes: 12 additions & 2 deletions butterfree/_cli/migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import os
import pkgutil
import sys
from typing import Set
from typing import Set, Type

import boto3
import setuptools
Expand Down Expand Up @@ -90,8 +90,18 @@ def __fs_objects(path: str) -> Set[FeatureSetPipeline]:

instances.add(value)

def create_instance(cls: Type[FeatureSetPipeline]) -> FeatureSetPipeline:
sig = inspect.signature(cls.__init__)
parameters = sig.parameters

if "run_date" in parameters:
run_date = datetime.datetime.today().strftime("%y-%m-%d")
return cls(run_date)

return cls()

logger.info("Creating instances...")
return set(value() for value in instances) # type: ignore
return set(create_instance(value) for value in instances) # type: ignore


PATH = typer.Argument(
Expand Down
14 changes: 10 additions & 4 deletions butterfree/extract/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import List, Optional

from pyspark.sql import DataFrame
from pyspark.storagelevel import StorageLevel

from butterfree.clients import SparkClient
from butterfree.extract.readers.reader import Reader
Expand Down Expand Up @@ -95,16 +96,21 @@ def construct(
DataFrame with the query result against all readers.

"""
# Step 1: Build temporary views for each reader
for reader in self.readers:
reader.build(
client=client, start_date=start_date, end_date=end_date
) # create temporary views for each reader
reader.build(client=client, start_date=start_date, end_date=end_date)

# Step 2: Execute SQL query on the combined readers
dataframe = client.sql(self.query)

# Step 3: Cache the dataframe if necessary, using memory and disk storage
if not dataframe.isStreaming and self.eager_evaluation:
dataframe.cache().count()
# Persist to ensure the DataFrame is stored in mem and disk (if necessary)
dataframe.persist(StorageLevel.MEMORY_AND_DISK)
# Trigger the cache/persist operation by performing an action
dataframe.count()

# Step 4: Run post-processing hooks on the dataframe
post_hook_df = self.run_post_hooks(dataframe)

return post_hook_df
30 changes: 22 additions & 8 deletions butterfree/pipelines/feature_set_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from typing import List, Optional

from pyspark.storagelevel import StorageLevel

from butterfree.clients import SparkClient
from butterfree.dataframe_service import repartition_sort_df
from butterfree.extract import Source
Expand Down Expand Up @@ -209,35 +211,47 @@ def run(
soon. Use only if strictly necessary.

"""

# Step 1: Construct input dataframe from the source.
dataframe = self.source.construct(
client=self.spark_client,
start_date=self.feature_set.define_start_date(start_date),
end_date=end_date,
)

# Step 2: Repartition and sort if required, avoid if not necessary.
if partition_by:
order_by = order_by or partition_by
dataframe = repartition_sort_df(
dataframe, partition_by, order_by, num_processors
)

dataframe = self.feature_set.construct(
current_partitions = dataframe.rdd.getNumPartitions()
optimal_partitions = num_processors or current_partitions
if current_partitions != optimal_partitions:
dataframe = repartition_sort_df(
dataframe, partition_by, order_by, num_processors
)

# Step 3: Construct the feature set dataframe using defined transformations.
transformed_dataframe = self.feature_set.construct(
dataframe=dataframe,
client=self.spark_client,
start_date=start_date,
end_date=end_date,
num_processors=num_processors,
)

if dataframe.storageLevel != StorageLevel.NONE:
dataframe.unpersist() # Clear the data from the cache (disk and memory)

# Step 4: Load the data into the configured sink.
self.sink.flush(
dataframe=dataframe,
dataframe=transformed_dataframe,
feature_set=self.feature_set,
spark_client=self.spark_client,
)

if not dataframe.isStreaming:
# Step 5: Validate the output if not streaming and data volume is reasonable.
if not transformed_dataframe.isStreaming:
self.sink.validate(
dataframe=dataframe,
dataframe=transformed_dataframe,
feature_set=self.feature_set,
spark_client=self.spark_client,
)
Expand Down
32 changes: 19 additions & 13 deletions butterfree/transform/aggregated_feature_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ def _aggregate(
]

groupby = self.keys_columns.copy()

if window is not None:
dataframe = dataframe.withColumn("window", window.get())
groupby.append("window")
Expand All @@ -410,19 +411,23 @@ def _aggregate(
"keep_rn", functions.row_number().over(partition_window)
).filter("keep_rn = 1")

# repartition to have all rows for each group at the same partition
# by doing that, we won't have to shuffle data on grouping by id
dataframe = repartition_df(
dataframe,
partition_by=groupby,
num_processors=num_processors,
)
current_partitions = dataframe.rdd.getNumPartitions()
optimal_partitions = num_processors or current_partitions

if current_partitions != optimal_partitions:
dataframe = repartition_df(
dataframe,
partition_by=groupby,
num_processors=optimal_partitions,
)

grouped_data = dataframe.groupby(*groupby)

if self._pivot_column:
if self._pivot_column and self._pivot_values:
grouped_data = grouped_data.pivot(self._pivot_column, self._pivot_values)

aggregated = grouped_data.agg(*aggregations)

return self._with_renamed_columns(aggregated, features, window)

def _with_renamed_columns(
Expand Down Expand Up @@ -637,12 +642,13 @@ def construct(
output_df = output_df.select(*self.columns).replace( # type: ignore
float("nan"), None
)
if not output_df.isStreaming:
if self.deduplicate_rows:
output_df = self._filter_duplicated_rows(output_df)
if self.eager_evaluation:
output_df.cache().count()

if not output_df.isStreaming and self.deduplicate_rows:
output_df = self._filter_duplicated_rows(output_df)

post_hook_df = self.run_post_hooks(output_df)

if not output_df.isStreaming and self.eager_evaluation:
post_hook_df.cache().count()

return post_hook_df
7 changes: 2 additions & 5 deletions butterfree/transform/feature_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,11 +436,8 @@ def construct(
pre_hook_df,
).select(*self.columns)

if not output_df.isStreaming:
if self.deduplicate_rows:
output_df = self._filter_duplicated_rows(output_df)
if self.eager_evaluation:
output_df.cache().count()
if not output_df.isStreaming and self.deduplicate_rows:
output_df = self._filter_duplicated_rows(output_df)

output_df = self.incremental_strategy.filter_with_incremental_strategy(
dataframe=output_df, start_date=start_date, end_date=end_date
Expand Down
20 changes: 0 additions & 20 deletions docs/source/butterfree.configs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,6 @@ butterfree.configs.environment module
butterfree.configs.logger module
--------------------------------

.. automodule:: butterfree.configs.logger
:members:
:undoc-members:
:show-inheritance:

.. automodule:: butterfree.configs.logger
:members:
:undoc-members:
:show-inheritance:

.. automodule:: butterfree.configs.logger
:members:
:undoc-members:
:show-inheritance:

.. automodule:: butterfree.configs.logger
:members:
:undoc-members:
:show-inheritance:

.. automodule:: butterfree.configs.logger
:members:
:undoc-members:
Expand Down
42 changes: 0 additions & 42 deletions docs/source/butterfree.constants.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,28 +31,6 @@ butterfree.constants.migrations module
butterfree.constants.spark\_constants module
--------------------------------------------

.. automodule:: butterfree.constants.migrations
:members:
:undoc-members:
:show-inheritance:


.. automodule:: butterfree.constants.migrations
:members:
:undoc-members:
:show-inheritance:


.. automodule:: butterfree.constants.migrations
:members:
:undoc-members:
:show-inheritance:


.. automodule:: butterfree.constants.migrations
:members:
:undoc-members:
:show-inheritance:

.. automodule:: butterfree.constants.spark_constants
:members:
Expand All @@ -62,26 +40,6 @@ butterfree.constants.spark\_constants module
butterfree.constants.window\_definitions module
-----------------------------------------------

.. automodule:: butterfree.constants.window_definitions
:members:
:undoc-members:
:show-inheritance:

.. automodule:: butterfree.constants.window_definitions
:members:
:undoc-members:
:show-inheritance:

.. automodule:: butterfree.constants.window_definitions
:members:
:undoc-members:
:show-inheritance:

.. automodule:: butterfree.constants.window_definitions
:members:
:undoc-members:
:show-inheritance:

.. automodule:: butterfree.constants.window_definitions
:members:
:undoc-members:
Expand Down
8 changes: 0 additions & 8 deletions docs/source/butterfree.dataframe_service.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,6 @@ butterfree.dataframe\_service.partitioning module
:undoc-members:
:show-inheritance:

butterfree.dataframe\_service.repartition module
------------------------------------------------

.. automodule:: butterfree.dataframe_service.repartition
:members:
:undoc-members:
:show-inheritance:

.. automodule:: butterfree.dataframe_service.repartition
:members:
:undoc-members:
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from setuptools import find_packages, setup

__package_name__ = "butterfree"
__version__ = "1.4.0"
__version__ = "1.4.1"
__repository_url__ = "https://github.com/quintoandar/butterfree"

with open("requirements.txt") as f:
Expand Down
Loading
Loading