diff --git a/CHANGELOG.md b/CHANGELOG.md index 19d9b5f4..2294c20d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,9 @@ Preferably use **Added**, **Changed**, **Removed** and **Fixed** topics in each ## [Unreleased] +## [1.4.1](https://github.com/quintoandar/butterfree/releases/tag/1.4.1) +* Performance Improvements ([#374](https://github.com/quintoandar/butterfree/pull/374)) + ## [1.4.0](https://github.com/quintoandar/butterfree/releases/tag/1.4.0) * Add Delta support ([#370](https://github.com/quintoandar/butterfree/pull/370)) diff --git a/butterfree/_cli/migrate.py b/butterfree/_cli/migrate.py index f5161509..6bd5ca08 100644 --- a/butterfree/_cli/migrate.py +++ b/butterfree/_cli/migrate.py @@ -4,7 +4,7 @@ import os import pkgutil import sys -from typing import Set +from typing import Set, Type import boto3 import setuptools @@ -90,8 +90,18 @@ def __fs_objects(path: str) -> Set[FeatureSetPipeline]: instances.add(value) + def create_instance(cls: Type[FeatureSetPipeline]) -> FeatureSetPipeline: + sig = inspect.signature(cls.__init__) + parameters = sig.parameters + + if "run_date" in parameters: + run_date = datetime.datetime.today().strftime("%y-%m-%d") + return cls(run_date) + + return cls() + logger.info("Creating instances...") - return set(value() for value in instances) # type: ignore + return set(create_instance(value) for value in instances) # type: ignore PATH = typer.Argument( diff --git a/butterfree/extract/source.py b/butterfree/extract/source.py index bfc15271..9d50e94c 100644 --- a/butterfree/extract/source.py +++ b/butterfree/extract/source.py @@ -3,6 +3,7 @@ from typing import List, Optional from pyspark.sql import DataFrame +from pyspark.storagelevel import StorageLevel from butterfree.clients import SparkClient from butterfree.extract.readers.reader import Reader @@ -95,16 +96,21 @@ def construct( DataFrame with the query result against all readers. """ + # Step 1: Build temporary views for each reader for reader in self.readers: - reader.build( - client=client, start_date=start_date, end_date=end_date - ) # create temporary views for each reader + reader.build(client=client, start_date=start_date, end_date=end_date) + # Step 2: Execute SQL query on the combined readers dataframe = client.sql(self.query) + # Step 3: Cache the dataframe if necessary, using memory and disk storage if not dataframe.isStreaming and self.eager_evaluation: - dataframe.cache().count() + # Persist to ensure the DataFrame is stored in mem and disk (if necessary) + dataframe.persist(StorageLevel.MEMORY_AND_DISK) + # Trigger the cache/persist operation by performing an action + dataframe.count() + # Step 4: Run post-processing hooks on the dataframe post_hook_df = self.run_post_hooks(dataframe) return post_hook_df diff --git a/butterfree/pipelines/feature_set_pipeline.py b/butterfree/pipelines/feature_set_pipeline.py index 8ba1a636..464b821b 100644 --- a/butterfree/pipelines/feature_set_pipeline.py +++ b/butterfree/pipelines/feature_set_pipeline.py @@ -2,6 +2,8 @@ from typing import List, Optional +from pyspark.storagelevel import StorageLevel + from butterfree.clients import SparkClient from butterfree.dataframe_service import repartition_sort_df from butterfree.extract import Source @@ -209,19 +211,26 @@ def run( soon. Use only if strictly necessary. """ + + # Step 1: Construct input dataframe from the source. dataframe = self.source.construct( client=self.spark_client, start_date=self.feature_set.define_start_date(start_date), end_date=end_date, ) + # Step 2: Repartition and sort if required, avoid if not necessary. if partition_by: order_by = order_by or partition_by - dataframe = repartition_sort_df( - dataframe, partition_by, order_by, num_processors - ) - - dataframe = self.feature_set.construct( + current_partitions = dataframe.rdd.getNumPartitions() + optimal_partitions = num_processors or current_partitions + if current_partitions != optimal_partitions: + dataframe = repartition_sort_df( + dataframe, partition_by, order_by, num_processors + ) + + # Step 3: Construct the feature set dataframe using defined transformations. + transformed_dataframe = self.feature_set.construct( dataframe=dataframe, client=self.spark_client, start_date=start_date, @@ -229,15 +238,20 @@ def run( num_processors=num_processors, ) + if dataframe.storageLevel != StorageLevel.NONE: + dataframe.unpersist() # Clear the data from the cache (disk and memory) + + # Step 4: Load the data into the configured sink. self.sink.flush( - dataframe=dataframe, + dataframe=transformed_dataframe, feature_set=self.feature_set, spark_client=self.spark_client, ) - if not dataframe.isStreaming: + # Step 5: Validate the output if not streaming and data volume is reasonable. + if not transformed_dataframe.isStreaming: self.sink.validate( - dataframe=dataframe, + dataframe=transformed_dataframe, feature_set=self.feature_set, spark_client=self.spark_client, ) diff --git a/butterfree/transform/aggregated_feature_set.py b/butterfree/transform/aggregated_feature_set.py index 6706bf8c..fbd46227 100644 --- a/butterfree/transform/aggregated_feature_set.py +++ b/butterfree/transform/aggregated_feature_set.py @@ -387,6 +387,7 @@ def _aggregate( ] groupby = self.keys_columns.copy() + if window is not None: dataframe = dataframe.withColumn("window", window.get()) groupby.append("window") @@ -410,19 +411,23 @@ def _aggregate( "keep_rn", functions.row_number().over(partition_window) ).filter("keep_rn = 1") - # repartition to have all rows for each group at the same partition - # by doing that, we won't have to shuffle data on grouping by id - dataframe = repartition_df( - dataframe, - partition_by=groupby, - num_processors=num_processors, - ) + current_partitions = dataframe.rdd.getNumPartitions() + optimal_partitions = num_processors or current_partitions + + if current_partitions != optimal_partitions: + dataframe = repartition_df( + dataframe, + partition_by=groupby, + num_processors=optimal_partitions, + ) + grouped_data = dataframe.groupby(*groupby) - if self._pivot_column: + if self._pivot_column and self._pivot_values: grouped_data = grouped_data.pivot(self._pivot_column, self._pivot_values) aggregated = grouped_data.agg(*aggregations) + return self._with_renamed_columns(aggregated, features, window) def _with_renamed_columns( @@ -637,12 +642,13 @@ def construct( output_df = output_df.select(*self.columns).replace( # type: ignore float("nan"), None ) - if not output_df.isStreaming: - if self.deduplicate_rows: - output_df = self._filter_duplicated_rows(output_df) - if self.eager_evaluation: - output_df.cache().count() + + if not output_df.isStreaming and self.deduplicate_rows: + output_df = self._filter_duplicated_rows(output_df) post_hook_df = self.run_post_hooks(output_df) + if not output_df.isStreaming and self.eager_evaluation: + post_hook_df.cache().count() + return post_hook_df diff --git a/butterfree/transform/feature_set.py b/butterfree/transform/feature_set.py index 369eaf29..2c4b9b51 100644 --- a/butterfree/transform/feature_set.py +++ b/butterfree/transform/feature_set.py @@ -436,11 +436,8 @@ def construct( pre_hook_df, ).select(*self.columns) - if not output_df.isStreaming: - if self.deduplicate_rows: - output_df = self._filter_duplicated_rows(output_df) - if self.eager_evaluation: - output_df.cache().count() + if not output_df.isStreaming and self.deduplicate_rows: + output_df = self._filter_duplicated_rows(output_df) output_df = self.incremental_strategy.filter_with_incremental_strategy( dataframe=output_df, start_date=start_date, end_date=end_date diff --git a/docs/source/butterfree.configs.rst b/docs/source/butterfree.configs.rst index 2a5cc07f..20432e45 100644 --- a/docs/source/butterfree.configs.rst +++ b/docs/source/butterfree.configs.rst @@ -23,26 +23,6 @@ butterfree.configs.environment module butterfree.configs.logger module -------------------------------- -.. automodule:: butterfree.configs.logger - :members: - :undoc-members: - :show-inheritance: - -.. automodule:: butterfree.configs.logger - :members: - :undoc-members: - :show-inheritance: - -.. automodule:: butterfree.configs.logger - :members: - :undoc-members: - :show-inheritance: - -.. automodule:: butterfree.configs.logger - :members: - :undoc-members: - :show-inheritance: - .. automodule:: butterfree.configs.logger :members: :undoc-members: diff --git a/docs/source/butterfree.constants.rst b/docs/source/butterfree.constants.rst index de6f1cee..2008aaf0 100644 --- a/docs/source/butterfree.constants.rst +++ b/docs/source/butterfree.constants.rst @@ -31,28 +31,6 @@ butterfree.constants.migrations module butterfree.constants.spark\_constants module -------------------------------------------- -.. automodule:: butterfree.constants.migrations - :members: - :undoc-members: - :show-inheritance: - - -.. automodule:: butterfree.constants.migrations - :members: - :undoc-members: - :show-inheritance: - - -.. automodule:: butterfree.constants.migrations - :members: - :undoc-members: - :show-inheritance: - - -.. automodule:: butterfree.constants.migrations - :members: - :undoc-members: - :show-inheritance: .. automodule:: butterfree.constants.spark_constants :members: @@ -62,26 +40,6 @@ butterfree.constants.spark\_constants module butterfree.constants.window\_definitions module ----------------------------------------------- -.. automodule:: butterfree.constants.window_definitions - :members: - :undoc-members: - :show-inheritance: - -.. automodule:: butterfree.constants.window_definitions - :members: - :undoc-members: - :show-inheritance: - -.. automodule:: butterfree.constants.window_definitions - :members: - :undoc-members: - :show-inheritance: - -.. automodule:: butterfree.constants.window_definitions - :members: - :undoc-members: - :show-inheritance: - .. automodule:: butterfree.constants.window_definitions :members: :undoc-members: diff --git a/docs/source/butterfree.dataframe_service.rst b/docs/source/butterfree.dataframe_service.rst index faf9cf54..3c8026cf 100644 --- a/docs/source/butterfree.dataframe_service.rst +++ b/docs/source/butterfree.dataframe_service.rst @@ -20,14 +20,6 @@ butterfree.dataframe\_service.partitioning module :undoc-members: :show-inheritance: -butterfree.dataframe\_service.repartition module ------------------------------------------------- - -.. automodule:: butterfree.dataframe_service.repartition - :members: - :undoc-members: - :show-inheritance: - .. automodule:: butterfree.dataframe_service.repartition :members: :undoc-members: diff --git a/setup.py b/setup.py index e6b9f761..a748fdd1 100644 --- a/setup.py +++ b/setup.py @@ -1,7 +1,7 @@ from setuptools import find_packages, setup __package_name__ = "butterfree" -__version__ = "1.4.0" +__version__ = "1.4.1" __repository_url__ = "https://github.com/quintoandar/butterfree" with open("requirements.txt") as f: diff --git a/tests/unit/butterfree/transform/conftest.py b/tests/unit/butterfree/transform/conftest.py index c0ebb47a..d66d1c39 100644 --- a/tests/unit/butterfree/transform/conftest.py +++ b/tests/unit/butterfree/transform/conftest.py @@ -1,10 +1,7 @@ -import json from unittest.mock import Mock import pyspark.pandas as ps from pyspark.sql import functions -from pyspark.sql.functions import col -from pyspark.sql.types import TimestampType from pytest import fixture from butterfree.constants import DataType @@ -28,74 +25,6 @@ def create_dataframe(data, timestamp_col="ts"): return df -def create_dataframe_from_data( - spark_context, spark_session, data, timestamp_col="timestamp", use_json=False -): - if use_json: - df = spark_session.read.json( - spark_context.parallelize(data).map(lambda x: json.dumps(x)) - ) - else: - df = create_dataframe(data, timestamp_col=timestamp_col) - - df = df.withColumn(timestamp_col, col(timestamp_col).cast(TimestampType())) - return df - - -def create_rolling_windows_agg_dataframe( - spark_context, spark_session, data, timestamp_col="timestamp", use_json=False -): - if use_json: - df = spark_session.read.json( - spark_context.parallelize(data).map(lambda x: json.dumps(x)) - ) - df = df.withColumn( - timestamp_col, col(timestamp_col).cast(DataType.TIMESTAMP.spark) - ) - else: - df = create_dataframe(data, timestamp_col=timestamp_col) - - return df - - -def build_data(rows, base_features, dynamic_features=None): - """ - Constrói uma lista de dicionários para DataFrame com recursos dinâmicos. - - :param rows: Lista de tuplas com (id, timestamp, base_values, dynamic_values). - :param base_features: Lista de nomes de recursos base (strings). - :param dynamic_features: Lista de nomes de recursos dinâmicos, - mapeando para o índice de dynamic_values (opcional). - :return: Lista de dicionários para criação do DataFrame. - """ - data = [] - for row in rows: - id_value, timestamp_value, base_values, dynamic_values = row - - entry = { - "id": id_value, - "timestamp": timestamp_value, - } - - # Adiciona valores das features base - entry.update( - {feature: value for feature, value in zip(base_features, base_values)} - ) - - # Adiciona valores das features dinâmicas, se houver - if dynamic_features: - entry.update( - { - feature: dynamic_values[idx] - for idx, feature in enumerate(dynamic_features) - } - ) - - data.append(entry) - - return data - - def make_dataframe(spark_context, spark_session): data = [ { @@ -162,90 +91,107 @@ def make_output_filtering_dataframe(spark_context, spark_session): def make_rolling_windows_agg_dataframe(spark_context, spark_session): - rows = [ - (1, "2016-04-11 00:00:00", [None, None], None), - (1, "2016-04-12 00:00:00", [300.0, 350.0], None), - (1, "2016-04-19 00:00:00", [None, None], None), - (1, "2016-04-23 00:00:00", [1000.0, 1100.0], None), - (1, "2016-04-30 00:00:00", [None, None], None), - ] - - base_features = [ - "feature1__avg_over_1_week_rolling_windows", - "feature2__avg_over_1_week_rolling_windows", + data = [ + { + "id": 1, + "timestamp": "2016-04-11 00:00:00", + "feature1__avg_over_1_week_rolling_windows": None, + "feature2__avg_over_1_week_rolling_windows": None, + }, + { + "id": 1, + "timestamp": "2016-04-12 00:00:00", + "feature1__avg_over_1_week_rolling_windows": 300.0, + "feature2__avg_over_1_week_rolling_windows": 350.0, + }, + { + "id": 1, + "timestamp": "2016-04-19 00:00:00", + "feature1__avg_over_1_week_rolling_windows": None, + "feature2__avg_over_1_week_rolling_windows": None, + }, + { + "id": 1, + "timestamp": "2016-04-23 00:00:00", + "feature1__avg_over_1_week_rolling_windows": 1000.0, + "feature2__avg_over_1_week_rolling_windows": 1100.0, + }, + { + "id": 1, + "timestamp": "2016-04-30 00:00:00", + "feature1__avg_over_1_week_rolling_windows": None, + "feature2__avg_over_1_week_rolling_windows": None, + }, ] - - data = build_data(rows, base_features) - return create_dataframe_from_data(spark_context, spark_session, data) + return create_dataframe(data, timestamp_col="timestamp") def make_rolling_windows_hour_slide_agg_dataframe(spark_context, spark_session): - rows = [ - (1, "2016-04-11 12:00:00", [266.6666666666667, 300.0], None), - (1, "2016-04-12 00:00:00", [300.0, 350.0], None), - (1, "2016-04-12 12:00:00", [400.0, 500.0], None), - ] - - base_features = [ - "feature1__avg_over_1_day_rolling_windows", - "feature2__avg_over_1_day_rolling_windows", + data = [ + { + "id": 1, + "timestamp": "2016-04-11 12:00:00", + "feature1__avg_over_1_day_rolling_windows": 266.6666666666667, + "feature2__avg_over_1_day_rolling_windows": 300.0, + }, + { + "id": 1, + "timestamp": "2016-04-12 00:00:00", + "feature1__avg_over_1_day_rolling_windows": 300.0, + "feature2__avg_over_1_day_rolling_windows": 350.0, + }, + { + "id": 1, + "timestamp": "2016-04-12 12:00:00", + "feature1__avg_over_1_day_rolling_windows": 400.0, + "feature2__avg_over_1_day_rolling_windows": 500.0, + }, ] - - data = build_data(rows, base_features) - return create_dataframe_from_data(spark_context, spark_session, data) + return create_dataframe(data, timestamp_col="timestamp") def make_multiple_rolling_windows_hour_slide_agg_dataframe( spark_context, spark_session ): - rows = [ - ( - 1, - "2016-04-11 12:00:00", - [], - [266.6666666666667, 266.6666666666667, 300.0, 300.0], - ), - (1, "2016-04-12 00:00:00", [], [300.0, 300.0, 350.0, 350.0]), - (1, "2016-04-13 12:00:00", [], [400.0, 300.0, 500.0, 350.0]), - (1, "2016-04-14 00:00:00", [], [None, 300.0, None, 350.0]), - (1, "2016-04-14 12:00:00", [], [None, 400.0, None, 500.0]), - ] - - dynamic_features = [ - "feature1__avg_over_2_days_rolling_windows", - "feature1__avg_over_3_days_rolling_windows", - "feature2__avg_over_2_days_rolling_windows", - "feature2__avg_over_3_days_rolling_windows", + data = [ + { + "id": 1, + "timestamp": "2016-04-11 12:00:00", + "feature1__avg_over_2_days_rolling_windows": 266.6666666666667, + "feature1__avg_over_3_days_rolling_windows": 266.6666666666667, + "feature2__avg_over_2_days_rolling_windows": 300.0, + "feature2__avg_over_3_days_rolling_windows": 300.0, + }, + { + "id": 1, + "timestamp": "2016-04-12 00:00:00", + "feature1__avg_over_2_days_rolling_windows": 300.0, + "feature1__avg_over_3_days_rolling_windows": 300.0, + "feature2__avg_over_2_days_rolling_windows": 350.0, + "feature2__avg_over_3_days_rolling_windows": 350.0, + }, + { + "id": 1, + "timestamp": "2016-04-13 12:00:00", + "feature1__avg_over_2_days_rolling_windows": 400.0, + "feature1__avg_over_3_days_rolling_windows": 300.0, + "feature2__avg_over_2_days_rolling_windows": 500.0, + "feature2__avg_over_3_days_rolling_windows": 350.0, + }, + { + "id": 1, + "timestamp": "2016-04-14 00:00:00", + "feature1__avg_over_3_days_rolling_windows": 300.0, + "feature2__avg_over_3_days_rolling_windows": 350.0, + }, + { + "id": 1, + "timestamp": "2016-04-14 12:00:00", + "feature1__avg_over_3_days_rolling_windows": 400.0, + "feature2__avg_over_3_days_rolling_windows": 500.0, + }, ] - - data = build_data(rows, [], dynamic_features=dynamic_features) - return create_dataframe_from_data(spark_context, spark_session, data, use_json=True) - - -def create_rolling_window_dataframe( - spark_context, spark_session, rows, base_features, dynamic_features=None -): - """ - Cria um DataFrame com recursos de rolagem de janelas agregadas. - - :param spark_context: Contexto do Spark. - :param spark_session: Sessão do Spark. - :param rows: Lista de tuplas com (id, timestamp, base_values, dynamic_values). - :param base_features: Lista de nomes de recursos base (strings). - :param dynamic_features: Lista de nomes de recursos dinâmicos, - mapeando para o índice de dynamic_values (opcional). - :return: DataFrame do Spark. - """ - data = build_data(rows, base_features, dynamic_features) - - # Converte a lista de dicionários em um RDD do Spark - rdd = spark_context.parallelize(data).map(lambda x: json.dumps(x)) - - # Cria o DataFrame do Spark a partir do RDD - df = spark_session.read.json(rdd) - - # Converte a coluna "timestamp" para o tipo TIMESTAMP - df = df.withColumn("timestamp", df.timestamp.cast(DataType.TIMESTAMP.spark)) + return create_dataframe(data, timestamp_col="timestamp") def make_fs(spark_context, spark_session): diff --git a/tests/unit/butterfree/transform/test_feature_set.py b/tests/unit/butterfree/transform/test_feature_set.py index e907dc0a..37a69be2 100644 --- a/tests/unit/butterfree/transform/test_feature_set.py +++ b/tests/unit/butterfree/transform/test_feature_set.py @@ -220,7 +220,7 @@ def test_construct( + feature_divide.get_output_columns() ) assert_dataframe_equality(result_df, feature_set_dataframe) - assert result_df.is_cached + assert not result_df.is_cached def test_construct_invalid_df( self, key_id, timestamp_c, feature_add, feature_divide