Skip to content

Commit

Permalink
Add Delta support (#370)
Browse files Browse the repository at this point in the history
* feat: delta
  • Loading branch information
ralphrass authored Aug 19, 2024
1 parent 12d5e98 commit 35dd929
Show file tree
Hide file tree
Showing 35 changed files with 560 additions and 54 deletions.
4 changes: 1 addition & 3 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@ on:
paths:
- 'setup.py'


jobs:
Pipeline:
if: github.ref == 'refs/heads/master'

runs-on: ubuntu-latest

steps:
Expand All @@ -19,7 +17,7 @@ jobs:

- uses: actions/setup-java@v4
with:
java-version: '11'
java-version: '17'
distribution: microsoft

- uses: vemonet/setup-spark@v1
Expand Down
17 changes: 0 additions & 17 deletions .github/workflows/skip_lint.yml

This file was deleted.

3 changes: 1 addition & 2 deletions .github/workflows/staging.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ on:
jobs:
Pipeline:
if: github.ref == 'refs/heads/staging'

runs-on: ubuntu-latest

steps:
Expand All @@ -18,7 +17,7 @@ jobs:

- uses: actions/setup-java@v4
with:
java-version: '11'
java-version: '17'
distribution: microsoft

- uses: vemonet/setup-spark@v1
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:

- uses: actions/setup-java@v4
with:
java-version: '11'
java-version: '17'
distribution: microsoft

- uses: vemonet/setup-spark@v1
Expand Down
22 changes: 22 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,28 @@ Preferably use **Added**, **Changed**, **Removed** and **Fixed** topics in each

## [Unreleased]

## [1.3.5](https://github.com/quintoandar/butterfree/releases/tag/1.3.5)
* Auto create feature sets ([#368](https://github.com/quintoandar/butterfree/pull/368))

## [1.3.4](https://github.com/quintoandar/butterfree/releases/tag/1.3.4)
* Fix Cassandra Config and tests ([#366](https://github.com/quintoandar/butterfree/pull/366))

## [1.3.3](https://github.com/quintoandar/butterfree/releases/tag/1.3.3)
* Fix Cassandra Config and Numpy version ([#364](https://github.com/quintoandar/butterfree/pull/364))

## [1.3.2](https://github.com/quintoandar/butterfree/releases/tag/1.3.2)
* Fix publish script ([#362](https://github.com/quintoandar/butterfree/pull/362))

## [1.3.2](https://github.com/quintoandar/butterfree/releases/tag/1.3.2)
* Fix publish script ([#360](https://github.com/quintoandar/butterfree/pull/362))

## [1.3.1](https://github.com/quintoandar/butterfree/releases/tag/1.3.1)
* Timestamp NTZ available ([#360](https://github.com/quintoandar/butterfree/pull/360))

## [1.3.0](https://github.com/quintoandar/butterfree/releases/tag/1.3.0)
* Bump versions ([#355](https://github.com/quintoandar/butterfree/pull/355))
* Sphinx version ([#356](https://github.com/quintoandar/butterfree/pull/356))

## [1.2.4](https://github.com/quintoandar/butterfree/releases/tag/1.2.4)
* Auto create feature sets ([#351](https://github.com/quintoandar/butterfree/pull/351))

Expand Down
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ minimum-requirements:

.PHONY: requirements
## install all requirements
requirements: requirements-test requirements-lint dev-requirements minimum-requirements
requirements: minimum-requirements dev-requirements requirements-test requirements-lint

.PHONY: ci-install
ci-install:
Expand Down Expand Up @@ -146,6 +146,7 @@ package-name:
.PHONY: package
## build butterfree package wheel
package:
@PYTHONPATH=. pip3 install wheel
@PYTHONPATH=. python -m setup sdist bdist_wheel

.PHONY: update-docs
Expand Down
1 change: 1 addition & 0 deletions butterfree/clients/spark_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def conn(self) -> SparkSession:
"""
if not self._session:
self._session = SparkSession.builder.getOrCreate()

return self._session

def read(
Expand Down
3 changes: 2 additions & 1 deletion butterfree/load/writers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
"""Holds data loaders for historical and online feature store."""

from butterfree.load.writers.delta_writer import DeltaWriter
from butterfree.load.writers.historical_feature_store_writer import (
HistoricalFeatureStoreWriter,
)
from butterfree.load.writers.online_feature_store_writer import OnlineFeatureStoreWriter

__all__ = ["HistoricalFeatureStoreWriter", "OnlineFeatureStoreWriter"]
__all__ = ["HistoricalFeatureStoreWriter", "OnlineFeatureStoreWriter", "DeltaWriter"]
162 changes: 162 additions & 0 deletions butterfree/load/writers/delta_writer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
from delta.tables import DeltaTable
from pyspark.sql.dataframe import DataFrame

from butterfree.clients import SparkClient
from butterfree.configs.logger import __logger

logger = __logger("delta_writer", True)


class DeltaWriter:
"""Control operations on Delta Tables.
Resposible for merging and optimizing.
"""

@staticmethod
def _get_full_table_name(table, database):
if database:
return "{}.{}".format(database, table)
else:
return table

@staticmethod
def _convert_to_delta(client: SparkClient, table: str):
logger.info(f"Converting {table} to Delta...")
client.conn.sql(f"CONVERT TO DELTA {table}")
logger.info("Conversion done.")

@staticmethod
def merge(
client: SparkClient,
database: str,
table: str,
merge_on: list,
source_df: DataFrame,
when_not_matched_insert_condition: str = None,
when_matched_update_condition: str = None,
when_matched_delete_condition: str = None,
):
"""
Merge a source dataframe to a Delta table.
By default, it will update when matched, and insert when
not matched (simple upsert).
You can change this behavior by setting:
- when_not_matched_insert_condition: it will only insert
when this specified condition is true
- when_matched_update_condition: it will only update when this
specified condition is true. You can refer to the columns
in the source dataframe as source.<column_name>, and the columns
in the target table as target.<column_name>.
- when_matched_delete_condition: it will add an operation to delete,
but only if this condition is true. Again, source and
target dataframe columns can be referred to respectively as
source.<column_name> and target.<column_name>
"""
try:
full_table_name = DeltaWriter._get_full_table_name(table, database)

table_exists = client.conn.catalog.tableExists(full_table_name)

if table_exists:
pd_df = client.conn.sql(
f"DESCRIBE TABLE EXTENDED {full_table_name}"
).toPandas()
provider = (
pd_df.reset_index()
.groupby(["col_name"])["data_type"]
.aggregate("first")
.Provider
)
table_is_delta = provider.lower() == "delta"

if not table_is_delta:
DeltaWriter()._convert_to_delta(client, full_table_name)

# For schema evolution
client.conn.conf.set(
"spark.databricks.delta.schema.autoMerge.enabled", "true"
)

target_table = DeltaTable.forName(client.conn, full_table_name)
join_condition = " AND ".join(
[f"source.{col} = target.{col}" for col in merge_on]
)
merge_builder = target_table.alias("target").merge(
source_df.alias("source"), join_condition
)
if when_matched_delete_condition:
merge_builder = merge_builder.whenMatchedDelete(
condition=when_matched_delete_condition
)

merge_builder.whenMatchedUpdateAll(
condition=when_matched_update_condition
).whenNotMatchedInsertAll(
condition=when_not_matched_insert_condition
).execute()
except Exception as e:
logger.error(f"Merge operation on {full_table_name} failed: {e}")

@staticmethod
def vacuum(table: str, retention_hours: int, client: SparkClient):
"""Vacuum a Delta table.
Vacuum remove unused files (files not managed by Delta + files
that are not in the latest state).
After vacuum it's impossible to time travel to versions
older than the `retention` time.
Default retention is 7 days. Lower retentions will be warned,
unless it's set to false.
Set spark.databricks.delta.retentionDurationCheck.enabled
to false for low retentions.
https://docs.databricks.com/en/sql/language-manual/delta-vacuum.html
"""

command = f"VACUUM {table} RETAIN {retention_hours} HOURS"
logger.info(f"Running vacuum with command {command}")
client.conn.sql(command)
logger.info(f"Vacuum successful for table {table}")

@staticmethod
def optimize(
client: SparkClient,
table: str = None,
z_order: list = None,
date_column: str = "timestamp",
from_date: str = None,
auto_compact: bool = False,
optimize_write: bool = False,
):
"""Optimize a Delta table.
For auto-compaction and optimize write DBR >= 14.3 LTS
and Delta >= 3.1.0 are MANDATORY.
For z-ordering DBR >= 13.3 LTS and Delta >= 2.0.0 are MANDATORY.
Auto-compaction (recommended) reduces the small file problem
(overhead due to lots of metadata).
Z-order by columns that is commonly used in queries
predicates and has a high cardinality.
https://docs.delta.io/latest/optimizations-oss.html
"""

if auto_compact:
client.conf.set("spark.databricks.delta.autoCompact.enabled", "true")

if optimize_write:
client.conf.set("spark.databricks.delta.optimizeWrite.enabled", "true")

if table:
command = f"OPTIMIZE {table}"

if from_date:
command += f"WHERE {date_column} >= {from_date}"

if z_order:
command += f" ZORDER BY {','.join(z_order)}"

logger.info(f"Running optimize with command {command}...")
client.conn.sql(command)
logger.info(f"Optimize successful for table {table}.")
27 changes: 20 additions & 7 deletions butterfree/load/writers/historical_feature_store_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from butterfree.dataframe_service import repartition_df
from butterfree.hooks import Hook
from butterfree.hooks.schema_compatibility import SparkTableSchemaCompatibilityHook
from butterfree.load.writers.delta_writer import DeltaWriter
from butterfree.load.writers.writer import Writer
from butterfree.transform import FeatureSet

Expand Down Expand Up @@ -114,13 +115,15 @@ def __init__(
interval_mode: bool = False,
check_schema_hook: Optional[Hook] = None,
row_count_validation: bool = True,
merge_on: list = None,
):
super(HistoricalFeatureStoreWriter, self).__init__(
db_config or MetastoreConfig(),
debug_mode,
interval_mode,
False,
row_count_validation,
merge_on,
)
self.database = database or environment.get_variable(
"FEATURE_STORE_HISTORICAL_DATABASE"
Expand All @@ -141,6 +144,7 @@ def write(
feature_set: object processed with feature_set informations.
dataframe: spark dataframe containing data from a feature set.
spark_client: client for spark connections with external services.
merge_on: when filled, the writing is an upsert in a Delta table.
If the debug_mode is set to True, a temporary table with a name in the format:
historical_feature_store__{feature_set.name} will be created instead of writing
Expand Down Expand Up @@ -174,13 +178,22 @@ def write(

s3_key = os.path.join("historical", feature_set.entity, feature_set.name)

spark_client.write_table(
dataframe=dataframe,
database=self.database,
table_name=feature_set.name,
partition_by=self.PARTITION_BY,
**self.db_config.get_options(s3_key),
)
if self.merge_on:
DeltaWriter.merge(
client=spark_client,
database=self.database,
table=feature_set.name,
merge_on=self.merge_on,
source_df=dataframe,
)
else:
spark_client.write_table(
dataframe=dataframe,
database=self.database,
table_name=feature_set.name,
partition_by=self.PARTITION_BY,
**self.db_config.get_options(s3_key),
)

def _assert_validation_count(
self, table_name: str, written_count: int, dataframe_count: int
Expand Down
2 changes: 2 additions & 0 deletions butterfree/load/writers/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def __init__(
interval_mode: Optional[bool] = False,
write_to_entity: Optional[bool] = False,
row_count_validation: Optional[bool] = True,
merge_on: Optional[list] = None,
) -> None:
super().__init__()
self.db_config = db_config
Expand All @@ -35,6 +36,7 @@ def __init__(
self.interval_mode = interval_mode
self.write_to_entity = write_to_entity
self.row_count_validation = row_count_validation
self.merge_on = merge_on

def with_(
self, transformer: Callable[..., DataFrame], *args: Any, **kwargs: Any
Expand Down
6 changes: 6 additions & 0 deletions docs/source/butterfree.clients.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,24 @@ butterfree.clients package
Submodules
----------

butterfree.clients.abstract\_client module
------------------------------------------

.. automodule:: butterfree.clients.abstract_client
:members:
:undoc-members:
:show-inheritance:

butterfree.clients.cassandra\_client module
-------------------------------------------

.. automodule:: butterfree.clients.cassandra_client
:members:
:undoc-members:
:show-inheritance:

butterfree.clients.spark\_client module
---------------------------------------

.. automodule:: butterfree.clients.spark_client
:members:
Expand Down
Loading

1 comment on commit 35dd929

@chip-n-dale
Copy link

@chip-n-dale chip-n-dale bot commented on 35dd929 Aug 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @ralphrass!

The GitLeaks SecTool reported some possibly exposed credentials/secrets, how about giving them a look?

GitLeaks Alert Sync
[
  {
    "line": "    webhook: REDACTED",
    "lineNumber": 141,
    "offender": "REDACTED",
    "offenderEntropy": -1,
    "commit": "b6a5daf28abc035f74b9685aab573d384680b9d1",
    "repo": "butterfree",
    "repoURL": "",
    "leakURL": "",
    "rule": "Slack Webhook",
    "commitMessage": "initial commit\n",
    "author": "Alvaro",
    "email": "alvaro.marques.andrade@gmail.com",
    "file": ".drone.yml",
    "date": "2020-01-03T14:21:51-03:00",
    "tags": "key, slack"
  },
  {
    "line": "    webhook: REDACTED",
    "lineNumber": 159,
    "offender": "REDACTED",
    "offenderEntropy": -1,
    "commit": "b6697aa708fec0c5a9e3af0b2713cee6f45ff675",
    "repo": "butterfree",
    "repoURL": "",
    "leakURL": "",
    "rule": "Slack Webhook",
    "commitMessage": "hail to the butterfree\n",
    "author": "Alvaro",
    "email": "alvaro.marques.andrade@gmail.com",
    "file": ".drone.yml",
    "date": "2020-01-03T11:07:44-03:00",
    "tags": "key, slack"
  }
]

In case of false positives, more information is available on GitLeaks FAQ
If you have any other problem or question during this process, contact us in the Security space on GChat!

Please sign in to comment.