-
Notifications
You must be signed in to change notification settings - Fork 36
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Delta support #370
Add Delta support #370
Changes from all commits
e443db9
1058c31
adebeb9
7ba8d1a
6c3637f
788ea75
99662f6
14ff019
e787452
f1d4626
6df7534
dd8cefe
0e435be
30bc37a
e1e42eb
9854abc
bbeb402
9604191
754cc41
1236484
7e08d5a
81cecde
86522fc
9036892
516dfdf
a87954d
79f5a2b
87b97ba
f5c48ef
f6c4eeb
707fd6e
fd882e9
94d4049
30f9533
9f2ea4a
04f0320
080e6e5
9ec8d33
abb8243
4e873f5
9461472
d1e9318
4fc6c9b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,8 +1,9 @@ | ||
"""Holds data loaders for historical and online feature store.""" | ||
|
||
from butterfree.load.writers.delta_writer import DeltaWriter | ||
from butterfree.load.writers.historical_feature_store_writer import ( | ||
HistoricalFeatureStoreWriter, | ||
) | ||
from butterfree.load.writers.online_feature_store_writer import OnlineFeatureStoreWriter | ||
|
||
__all__ = ["HistoricalFeatureStoreWriter", "OnlineFeatureStoreWriter"] | ||
__all__ = ["HistoricalFeatureStoreWriter", "OnlineFeatureStoreWriter", "DeltaWriter"] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,162 @@ | ||
from delta.tables import DeltaTable | ||
from pyspark.sql.dataframe import DataFrame | ||
|
||
from butterfree.clients import SparkClient | ||
from butterfree.configs.logger import __logger | ||
|
||
logger = __logger("delta_writer", True) | ||
|
||
|
||
class DeltaWriter: | ||
"""Control operations on Delta Tables. | ||
|
||
Resposible for merging and optimizing. | ||
""" | ||
|
||
@staticmethod | ||
def _get_full_table_name(table, database): | ||
if database: | ||
return "{}.{}".format(database, table) | ||
else: | ||
return table | ||
|
||
@staticmethod | ||
def _convert_to_delta(client: SparkClient, table: str): | ||
logger.info(f"Converting {table} to Delta...") | ||
client.conn.sql(f"CONVERT TO DELTA {table}") | ||
logger.info("Conversion done.") | ||
|
||
@staticmethod | ||
def merge( | ||
client: SparkClient, | ||
database: str, | ||
table: str, | ||
merge_on: list, | ||
source_df: DataFrame, | ||
when_not_matched_insert_condition: str = None, | ||
when_matched_update_condition: str = None, | ||
when_matched_delete_condition: str = None, | ||
): | ||
""" | ||
Merge a source dataframe to a Delta table. | ||
|
||
By default, it will update when matched, and insert when | ||
not matched (simple upsert). | ||
|
||
You can change this behavior by setting: | ||
- when_not_matched_insert_condition: it will only insert | ||
when this specified condition is true | ||
- when_matched_update_condition: it will only update when this | ||
specified condition is true. You can refer to the columns | ||
in the source dataframe as source.<column_name>, and the columns | ||
in the target table as target.<column_name>. | ||
- when_matched_delete_condition: it will add an operation to delete, | ||
but only if this condition is true. Again, source and | ||
target dataframe columns can be referred to respectively as | ||
source.<column_name> and target.<column_name> | ||
""" | ||
try: | ||
full_table_name = DeltaWriter._get_full_table_name(table, database) | ||
|
||
table_exists = client.conn.catalog.tableExists(full_table_name) | ||
|
||
if table_exists: | ||
pd_df = client.conn.sql( | ||
f"DESCRIBE TABLE EXTENDED {full_table_name}" | ||
).toPandas() | ||
provider = ( | ||
pd_df.reset_index() | ||
.groupby(["col_name"])["data_type"] | ||
.aggregate("first") | ||
.Provider | ||
) | ||
table_is_delta = provider.lower() == "delta" | ||
|
||
if not table_is_delta: | ||
DeltaWriter()._convert_to_delta(client, full_table_name) | ||
|
||
# For schema evolution | ||
client.conn.conf.set( | ||
"spark.databricks.delta.schema.autoMerge.enabled", "true" | ||
) | ||
|
||
target_table = DeltaTable.forName(client.conn, full_table_name) | ||
join_condition = " AND ".join( | ||
[f"source.{col} = target.{col}" for col in merge_on] | ||
) | ||
merge_builder = target_table.alias("target").merge( | ||
source_df.alias("source"), join_condition | ||
) | ||
if when_matched_delete_condition: | ||
merge_builder = merge_builder.whenMatchedDelete( | ||
condition=when_matched_delete_condition | ||
) | ||
|
||
merge_builder.whenMatchedUpdateAll( | ||
condition=when_matched_update_condition | ||
).whenNotMatchedInsertAll( | ||
condition=when_not_matched_insert_condition | ||
).execute() | ||
except Exception as e: | ||
logger.error(f"Merge operation on {full_table_name} failed: {e}") | ||
|
||
@staticmethod | ||
def vacuum(table: str, retention_hours: int, client: SparkClient): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The docstring tells us the default retention time is 7 days. Should it be set as a default value for this parameter? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is Delta's default. |
||
"""Vacuum a Delta table. | ||
|
||
Vacuum remove unused files (files not managed by Delta + files | ||
that are not in the latest state). | ||
After vacuum it's impossible to time travel to versions | ||
older than the `retention` time. | ||
Default retention is 7 days. Lower retentions will be warned, | ||
unless it's set to false. | ||
Set spark.databricks.delta.retentionDurationCheck.enabled | ||
to false for low retentions. | ||
https://docs.databricks.com/en/sql/language-manual/delta-vacuum.html | ||
""" | ||
|
||
command = f"VACUUM {table} RETAIN {retention_hours} HOURS" | ||
logger.info(f"Running vacuum with command {command}") | ||
client.conn.sql(command) | ||
logger.info(f"Vacuum successful for table {table}") | ||
|
||
@staticmethod | ||
def optimize( | ||
client: SparkClient, | ||
table: str = None, | ||
z_order: list = None, | ||
date_column: str = "timestamp", | ||
from_date: str = None, | ||
auto_compact: bool = False, | ||
optimize_write: bool = False, | ||
): | ||
"""Optimize a Delta table. | ||
|
||
For auto-compaction and optimize write DBR >= 14.3 LTS | ||
and Delta >= 3.1.0 are MANDATORY. | ||
For z-ordering DBR >= 13.3 LTS and Delta >= 2.0.0 are MANDATORY. | ||
Auto-compaction (recommended) reduces the small file problem | ||
(overhead due to lots of metadata). | ||
Z-order by columns that is commonly used in queries | ||
predicates and has a high cardinality. | ||
https://docs.delta.io/latest/optimizations-oss.html | ||
""" | ||
|
||
if auto_compact: | ||
client.conf.set("spark.databricks.delta.autoCompact.enabled", "true") | ||
|
||
if optimize_write: | ||
client.conf.set("spark.databricks.delta.optimizeWrite.enabled", "true") | ||
|
||
if table: | ||
command = f"OPTIMIZE {table}" | ||
|
||
if from_date: | ||
command += f"WHERE {date_column} >= {from_date}" | ||
|
||
if z_order: | ||
command += f" ZORDER BY {','.join(z_order)}" | ||
|
||
logger.info(f"Running optimize with command {command}...") | ||
client.conn.sql(command) | ||
logger.info(f"Optimize successful for table {table}.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understood correctly, if the table exists, we're converting it to delta, right?
Since we're using Pandas, is there any change that the Pandas representation might consume too much memory based on the size of the table or this Pandas representation is just storing some metadata that doesn't get bigger when the table gets bigger?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we are converting. Describe table is a very simple query, it's the same resource consumption no matter the size of the table. That is a trick found to correctly identify the table type without using path.