Skip to content

Commit

Permalink
Fix dup code (#373)
Browse files Browse the repository at this point in the history
* fix: dedup code
  • Loading branch information
ralphrass authored Aug 21, 2024
1 parent 35dd929 commit f6c5db6
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 36 deletions.
9 changes: 9 additions & 0 deletions butterfree/load/writers/historical_feature_store_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,15 @@ class HistoricalFeatureStoreWriter(Writer):
improve queries performance. The data is stored in partition folders in AWS S3
based on time (per year, month and day).
>>> spark_client = SparkClient()
>>> writer = HistoricalFeatureStoreWriter()
>>> writer.write(feature_set=feature_set,
... dataframe=dataframe,
... spark_client=spark_client
... merge_on=["id", "timestamp"])
This procedure will skip dataframe write and will activate Delta Merge.
Use it when the table already exist.
"""

PARTITION_BY = [
Expand Down
53 changes: 17 additions & 36 deletions tests/unit/butterfree/transform/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@
from butterfree.transform.utils import Function


def create_dataframe(data, timestamp_col="ts"):
pdf = ps.DataFrame.from_dict(data)
df = pdf.to_spark()
df = df.withColumn(
TIMESTAMP_COLUMN, df[timestamp_col].cast(DataType.TIMESTAMP.spark)
)
return df


def make_dataframe(spark_context, spark_session):
data = [
{
Expand Down Expand Up @@ -54,11 +63,7 @@ def make_dataframe(spark_context, spark_session):
"nonfeature": 0,
},
]
pdf = ps.DataFrame.from_dict(data)
df = pdf.to_spark()
df = df.withColumn(TIMESTAMP_COLUMN, df.ts.cast(DataType.TIMESTAMP.spark))

return df
return create_dataframe(data)


def make_filtering_dataframe(spark_context, spark_session):
Expand All @@ -71,11 +76,7 @@ def make_filtering_dataframe(spark_context, spark_session):
{"id": 1, "ts": 6, "feature1": None, "feature2": None, "feature3": None},
{"id": 1, "ts": 7, "feature1": None, "feature2": None, "feature3": None},
]
pdf = ps.DataFrame.from_dict(data)
df = pdf.to_spark()
df = df.withColumn(TIMESTAMP_COLUMN, df.ts.cast(DataType.TIMESTAMP.spark))

return df
return create_dataframe(data)


def make_output_filtering_dataframe(spark_context, spark_session):
Expand All @@ -86,11 +87,7 @@ def make_output_filtering_dataframe(spark_context, spark_session):
{"id": 1, "ts": 4, "feature1": 0, "feature2": 1, "feature3": 1},
{"id": 1, "ts": 6, "feature1": None, "feature2": None, "feature3": None},
]
pdf = ps.DataFrame.from_dict(data)
df = pdf.to_spark()
df = df.withColumn(TIMESTAMP_COLUMN, df.ts.cast(DataType.TIMESTAMP.spark))

return df
return create_dataframe(data)


def make_rolling_windows_agg_dataframe(spark_context, spark_session):
Expand Down Expand Up @@ -126,11 +123,7 @@ def make_rolling_windows_agg_dataframe(spark_context, spark_session):
"feature2__avg_over_1_week_rolling_windows": None,
},
]
pdf = ps.DataFrame.from_dict(data)
df = pdf.to_spark()
df = df.withColumn("timestamp", df.timestamp.cast(DataType.TIMESTAMP.spark))

return df
return create_dataframe(data, timestamp_col="timestamp")


def make_rolling_windows_hour_slide_agg_dataframe(spark_context, spark_session):
Expand All @@ -154,11 +147,7 @@ def make_rolling_windows_hour_slide_agg_dataframe(spark_context, spark_session):
"feature2__avg_over_1_day_rolling_windows": 500.0,
},
]
pdf = ps.DataFrame.from_dict(data)
df = pdf.to_spark()
df = df.withColumn("timestamp", df.timestamp.cast(DataType.TIMESTAMP.spark))

return df
return create_dataframe(data, timestamp_col="timestamp")


def make_multiple_rolling_windows_hour_slide_agg_dataframe(
Expand Down Expand Up @@ -202,11 +191,7 @@ def make_multiple_rolling_windows_hour_slide_agg_dataframe(
"feature2__avg_over_3_days_rolling_windows": 500.0,
},
]
pdf = ps.DataFrame.from_dict(data)
df = pdf.to_spark()
df = df.withColumn("timestamp", df.timestamp.cast(DataType.TIMESTAMP.spark))

return df
return create_dataframe(data, timestamp_col="timestamp")


def make_fs(spark_context, spark_session):
Expand Down Expand Up @@ -253,9 +238,7 @@ def make_fs_dataframe_with_distinct(spark_context, spark_session):
"h3": "86a8100efffffff",
},
]
pdf = ps.DataFrame.from_dict(data)
df = pdf.to_spark()
df = df.withColumn("timestamp", df.timestamp.cast(DataType.TIMESTAMP.spark))
df = create_dataframe(data, "timestamp")

return df

Expand Down Expand Up @@ -283,9 +266,7 @@ def make_target_df_distinct(spark_context, spark_session):
"feature__sum_over_3_days_rolling_windows": None,
},
]
pdf = ps.DataFrame.from_dict(data)
df = pdf.to_spark()
df = df.withColumn("timestamp", df.timestamp.cast(DataType.TIMESTAMP.spark))
df = create_dataframe(data, "timestamp")

return df

Expand Down

1 comment on commit f6c5db6

@chip-n-dale
Copy link

@chip-n-dale chip-n-dale bot commented on f6c5db6 Aug 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @ralphrass!

The GitLeaks SecTool reported some possibly exposed credentials/secrets, how about giving them a look?

GitLeaks Alert Sync

In case of false positives, more information is available on GitLeaks FAQ
If you have any other problem or question during this process, contact us in the Security space on GChat!

Please sign in to comment.