Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Delta support #370

Merged
merged 43 commits into from
Aug 19, 2024
Merged

Add Delta support #370

merged 43 commits into from
Aug 19, 2024

Conversation

ralphrass
Copy link
Contributor

@ralphrass ralphrass commented Aug 9, 2024

Why? 📖

Add support to Delta format. Delta is an open source project that improves the concept of a data lake (or a feature store). It provides ACID transactions, time travel on data, schema evolution, upserts and a couple more features. With Delta we can improve performance of ETL jobs as well as improve management as Delta is fully supported by common tools like Databricks.

What? 🔧

The strategy here was to add Delta support while maintaining compatibility with older versions of ETL environments such as Databricks. However, Python version has to be updated, because recent DBR versions uses it. Still, great features of Delta are only available in versions 2.0 and above such as OPTIMIZE and VACUUM. Also, it is important to note that only these versions of Delta support dynamic partition overwrite, a feature of Spark that we use in incremental jobs.

The usage of Delta don't require change if the loaded data are in Parquet. The adoption of Delta is highly recommended, as it enable ACID transactions and features that transform data lakes in a modern and concise data warehouse.

  • Add delta-spark as a requirement
  • Add DeltaWriter as an independent writer
  • Enable local tests with Delta
  • Enable upserts, optimizations and vacuum

Type of change

Please delete options that are not relevant.

  • New feature (non-breaking change which adds functionality)

How everything was tested? 📏

Unit testing
Databricks testing (ongoing)

Attention Points ⚠️

Replace me for what the reviewer will need to pay attention to in the PR or just to cover any concerns after the merge.

hmeretti and others added 13 commits February 12, 2021 14:48
* [MLOP-634] Butterfree dev workflow, set triggers for branches staging and master (#280)

* Change github actions pipelines.

* Change pipeline logic.

* [BUG] Fix Staging GithubActions Pipeline (#283)

* New step on pipelie.

* Some adjusts.

* Apply only wheel. (#285)

* [BUG] Change version on setup.py to PyPI (#286)

* Add new make command to change version.

* Change command order.

* Change desc and variable name.

* Change command name.

* Keep milliseconds when using 'from_ms' argument in timestamp feature (#284)

* changed timestamp resolution

* fix import

* simple refactor

Co-authored-by: Henrique Camargo <henriquecamargo@spf1lt-pj000560.ldap.quintoandar.com.br>

* Change trigger for pipeline staging (#287)

* Change trigger to publish dev pipeline.

* Some fix.

* Create a dev package. (#288)

* [MLOP-633] Butterfree dev workflow, update documentation (#281)

* Update workflow doc.

* Update README

* Add pre-release.

* Fix typo.

* [MLOP-632] Butterfree dev workflow, automate release description (#279)

* release 1.1.4

* update changelog

Co-authored-by: Mayara Moromisato <44944954+moromimay@users.noreply.github.com>
Co-authored-by: Henrique Camargo <henriquecamargo@spf1lt-pj000560.ldap.quintoandar.com.br>
Co-authored-by: AlvaroMarquesAndrade <45604858+AlvaroMarquesAndrade@users.noreply.github.com>
* [MLOP-636] Create migration classes (#282)

* [MLOP-635] Rebase Incremental Job/Interval Run branch for test on selected feature sets (#278)

* Add interval branch modifications.

* Add interval_runs notebook.

* Add tests.

* Apply style (black, flack8 and mypy).

* Fix tests.

* Change version to create package dev.

* Allow slide selection (#293)

* Fix Slide Duration Typo (#295)

* [MLOP-637] Implement diff method (#292)

* [MLOP-640] Create CLI with migrate command (#298)

* [MLOP-645] Implement query method, cassandra (#291)

* [MLOP-671] Implement get_schema on Spark client (#301)

* [MLOP-648] Implement query method, metastore (#294)

* Fix Validation Step (#302)

* [MLOP-647] [MLOP-646] Apply migrations (#300)

* add apply migration method

* add test apply migration

* add migrate actor with tests

* mypy compliant

* fix test interaction with mocked object

* Rebase and some adjusts.

Co-authored-by: Mayara Moromisato <may.alveslima@gmail.com>

* [BUG] Apply create_partitions to historical validate (#303)

* Apply create_partitions to historical validate.

* Remove comments and adjusts.

* [BUG] Fix key path for validate read (#304)

* Fix key path

* bump version

Co-authored-by: AlvaroMarquesAndrade <1a789766b1c4c8b679e80f11fa6d63d42fa4bcdf>

* [FIX] Add Partition types for Metastore (#305)

* [MLOP-639] Track logs in S3 (#306)

* Apply tracking logs and logging config.

* Adjusts in CLI and logging.conf.

* Some adjusts.

* Change version to generate new dev package

* Fix version.

* Apply style.

* Add new assert in the migrate unit test.

* [BUG] Change logging config (#307)

* Change logging config.

* Some adjusts.

* Remove a code smell.

* Change solution for tracking logs (#308)

* Change tracking logs method.

* Change version to generate dev package.

* Change path name in S3

* Read and write consistency level options (#309)

* modify cassandra client to be region aware

* add option for the user to set read and write consistency levels on cassandra config

* add tests

* use env vars instead

* Update butterfree/configs/db/cassandra_config.py

Co-authored-by: Rodrigo Martins de Oliveira <allrod5@users.noreply.github.com>

* Update butterfree/configs/db/cassandra_config.py

Co-authored-by: Rodrigo Martins de Oliveira <allrod5@users.noreply.github.com>

Co-authored-by: Rodrigo Martins de Oliveira <allrod5@users.noreply.github.com>

* Fix kafka reader. (#310)

* Fix path validate. (#311)

* Add local dc property (#312)

* add local dc property

* update version

* Remove metastore migrate (#313)

* Remove metastore migrate.

* Change version to create a dev package.

* Fix link in our docs. (#315)

* [BUG] Fix Cassandra Connect Session (#316)

* Fix Cassandra Connect Session.

* Apply style.

* Fix migration query. (#318)

* Fix migration query add type key. (#319)

* Fix db-config condition (#321)

* Fix db-config condition.

* Apply style.

* MLOP-642 Document migration in Butterfree (#320)

* update docs

* add more information and reference new cli.md file

* [MLOP-702] Debug mode for Automate Migration (#322)

* Create flag debug-mode.

* Fix tests.

* Fix migrate test.

* [MLOP-727] Improve logging messages (#325)

* Fix logging message for local file

* Remove json import

* [MLOP-728] Improve logging messages (#324)

* Improve logs.

* Revert debug-mode condition.

* Fix method to generate agg feature name. (#326)

* [MLOP-691]  Include step to add partition to SparkMetastore during writing of Butterfree (#327)

* Change writer type for interval mode.

* Some adjusts.

* Release 1.2.0

Co-authored-by: AlvaroMarquesAndrade <45604858+AlvaroMarquesAndrade@users.noreply.github.com>
Co-authored-by: Igor Gustavo Hoelscher <19557581+roelschr@users.noreply.github.com>
Co-authored-by: Felipe Victorino Caputo <13631451+fvcaputo@users.noreply.github.com>
Co-authored-by: Rodrigo Martins de Oliveira <allrod5@users.noreply.github.com>
Co-authored-by: Gabriel Brandão <37742275+GaBrandao@users.noreply.github.com>
* Add the missing link for H3 geohash (#330)

* Add the missing link for H3 geohash

* Update the H3 geohash link.

* Update the same link 

Update the same link in in spark_function_and_window.ipynb example

* Update README.md (#331)

* Update Github Actions Workflow runner (#332)

* Update Workflow runner version

* bump flake8-bandit

* chore: bypass false positive for S105

Co-authored-by: Lucas Cardozo <lucasecardozo@gmail.com>

* Delete sphinx version. (#334)

* Update files to staging (#336)

Co-authored-by: Rodrigo Martins de Oliveira <allrod5@users.noreply.github.com>

* Update butterfree/configs/db/cassandra_config.py

Co-authored-by: Rodrigo Martins de Oliveira <allrod5@users.noreply.github.com>

Co-authored-by: Rodrigo Martins de Oliveira <allrod5@users.noreply.github.com>

* Fix kafka reader. (#310)

* Fix path validate. (#311)

* Add local dc property (#312)

* release 1.2.1

Co-authored-by: Jay Vala <24193355+jdvala@users.noreply.github.com>
Co-authored-by: Rodrigo Martins de Oliveira <allrod5@users.noreply.github.com>
Co-authored-by: Lucas Fonseca <lucas.fonseca@quintoandar.com.br>
Co-authored-by: Lucas Cardozo <lucasecardozo@gmail.com>
Co-authored-by: Felipe Victorino Caputo <13631451+fvcaputo@users.noreply.github.com>
* Less strict requirements (#333)

* bump a few requirements; increase lower bound for h3 version range; adding pyarrow dev dependency

* fix type repr for spark types; fix: broken tests (pyspark 3.4)

---------

Co-authored-by: Ralph Rassweiler <ralphrass@gmail.com>

* feat: optional row count validation (#340)

* fix: parameter, libs (#341)

---------
* feat(MLOP-1985): optional params (#347)

---------
* feat(MLOP-2145): add feature set creation script (#351)

* feat: add feature set creation script

* feat(mlop-2145): updating auto fs creation (#352)

* feat(updating-auto-fs-creation): adding methods to the class as private and add Table dataclass

* feat(updating-auto-fs-creation): using dataclass and adding typing

* feat(updating-auto-fs-creation): finish using all type hints and apply format

* feat(updating-auto-fs-creation): add docstring and auto-infer by df

* fix(updating-auto-fs-creation): remove unused format

* feat(updating-auto-fs-creation): creating flake8 ignore list

* feat(updating-auto-fs-creation): apply fmt

* feat(updating-auto-fs-creation): init file

* feat(updating-auto-fs-creation): making more readable

* feat(updating-auto-fs-creation): remove wrong file

* feat(updating-auto-fs-creation): apply fmt

* feat(updating-auto-fs-creation): ignoring mypy

* feat(updating-auto-fs-creation): add unit test

* feat(updating-auto-fs-creation): using Dataframe from pyspark

---------
* feat(mlop-2269): bump versions (#355)

* fix: bump versions adjust tests

* add checklist

* chore: bump python

* bump pyspark

* chore: java version all steps modified

* fix: sphinx version (#356)
* feat(MLOP-2236): add NTZ (#360)

* feat: NTZ and new tests
* fix: package
* fix: to lower case
* pin numpy
* fix: Cassandra config keys (#366)
* fix: new type (#368)
@ralphrass ralphrass self-assigned this Aug 9, 2024
Copy link

Check out this pull request on  ReviewNB

See visual diffs & provide feedback on Jupyter Notebooks.


Powered by ReviewNB

@ralphrass ralphrass changed the title Ralph/add delta support Add delta support Aug 13, 2024
@ralphrass ralphrass changed the title Add delta support Add Delta support Aug 13, 2024
@ralphrass ralphrass marked this pull request as ready for review August 15, 2024 15:29
@ralphrass ralphrass requested a review from a team as a code owner August 15, 2024 15:29
full_table_name = DeltaWriter._get_full_table_name(table, database)

table_exists = client.conn.catalog.tableExists(full_table_name)
# table_is_delta = DeltaTable.isDeltaTable(client.conn, path)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove this?

Comment on lines +66 to +78
pd_df = client.conn.sql(
f"DESCRIBE TABLE EXTENDED {full_table_name}"
).toPandas()
provider = (
pd_df.reset_index()
.groupby(["col_name"])["data_type"]
.aggregate("first")
.Provider
)
table_is_delta = provider.lower() == "delta"

if not table_is_delta:
DeltaWriter()._convert_to_delta(client, full_table_name)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understood correctly, if the table exists, we're converting it to delta, right?

Since we're using Pandas, is there any change that the Pandas representation might consume too much memory based on the size of the table or this Pandas representation is just storing some metadata that doesn't get bigger when the table gets bigger?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we are converting. Describe table is a very simple query, it's the same resource consumption no matter the size of the table. That is a trick found to correctly identify the table type without using path.

logger.error(f"Merge operation on {full_table_name} failed: {e}")

@staticmethod
def vacuum(table: str, retention_hours: int, client: SparkClient):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docstring tells us the default retention time is 7 days.

Should it be set as a default value for this parameter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is Delta's default.

logging.json Outdated

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this file necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

Copy link

sonarcloud bot commented Aug 19, 2024

@ralphrass ralphrass merged commit 35dd929 into staging Aug 19, 2024
4 checks passed
ralphrass added a commit that referenced this pull request Aug 22, 2024
* Add Delta support (#370)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants