-
Notifications
You must be signed in to change notification settings - Fork 3.3k
feat(ingestion/hive-metastore): add upstream lineage to hive-metastore #15435
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
| description="Simplify v2 field paths to v1 by default. If the schema has Union or Array types, still falls back to v2", | ||
| ) | ||
|
|
||
| emit_storage_lineage: bool = Field( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we usually call this emit_... or include_...?
| default=False, | ||
| description="Whether to emit storage-to-Hive lineage", | ||
| ) | ||
| hive_storage_lineage_direction: str = Field( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this ability to choose direction, do we have something similar in other sources?
anyway, it could be a Literal https://docs.pydantic.dev/1.10/usage/types/#literal-type
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We also have this already in this in the main hive source (i.e. not the hive metastore source)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Therefore adding it here also for consistency. One of the main reasons for this is because Spark often can show lineage just into the files rather than the metastore depending on how the table is updated.
| if self._COMPLEX_TYPE.match(fields[0].nativeDataType) and isinstance( | ||
| fields[0].type.type, NullTypeClass | ||
| ): | ||
| assert len(fields) == 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dangerous use of assert - low severity
When running Python in production in optimized mode, assert calls are not executed. This mode is enabled by setting the PYTHONOPTIMIZE command line flag. Optimized mode is usually ON in production. Any safety check done using assert will not be executed.
Remediation: Raise an exception instead of using assert.
View details in Aikido Security
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great docs!
Still a new user may wonder: which one hive or hive-metastore should I use? is hive going to be deprecated? is there feature parity or what's the feature that one has that the other misses?
And for existing users of hive: what's the plan? is it going to be deprecated eventually? are new features going to be implemented in the hive one too?
| This creates URNs like: | ||
|
|
||
| ``` | ||
| urn:li:dataset:(urn:li:dataPlatform:hive,database.table,prod-hive) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
both hive and hive-metastore generate URNs with hive platform in the URN, right?
so if a user moves from hive to hive-metastore that won't be a breaking change in the identiyy of the assets, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is correct. You can move between the two and the URNs generated ultimately are the same but using different approaches (metastore db vs thrift). This hasn't changed.
| # add view properties | ||
| properties: Dict[str, str] = { | ||
| "is_view": "True", | ||
| } | ||
| properties: Dict[str, str] = {"is_view": "True"} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not new behaviour
anyway, why not using SubTypes aspect here to track this as a view?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
happy to make this a view subtype - just didn't want to introduce this as it is a change in the connector. Happy to do so if you think that's wise.
| "type": coltype, | ||
| "nullable": True, | ||
| "default": None, | ||
| "full_type": orig_col_type, # pass it through |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so full_type, this is what we really miss in the original non-patched method?
what would be the impact of missing int?
patch is a heavy tech debt, so wondering if the value worths it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is pre-existing -
datahub/metadata-ingestion/src/datahub/ingestion/source/sql/hive.py
Lines 605 to 612 in ddd9321
| result.append( | |
| { | |
| "name": col_name, | |
| "type": coltype, | |
| "nullable": True, | |
| "default": None, | |
| "full_type": orig_col_type, # pass it through | |
| "comment": _comment, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so this patch is not new? is that what you mean?
| HiveDialect.get_view_names = get_view_names_patched | ||
| HiveDialect.get_view_definition = get_view_definition_patched |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
HiveDialect, this comes from our fork, right? https://github.com/acryldata/PyHive
May we just fix there?
| except (ValueError, TypeError, AttributeError) as e: | ||
| logger.warning( | ||
| f"Failed to create storage dataset MCPs for {storage_location}: {e}", | ||
| exc_info=True, | ||
| ) | ||
| return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this exception swallowing pattern, is that what we usually do in other sources when we fail emission?
|
codecoverage report shows little coverage in failure/exception scenarios, we could iimprove a little bit there, mainly in hive_source.py also, concerning the lack of coverage for |
I'll add these today so that we can get this signed off. |
Add Storage Lineage to Hive/Hive Metastore + Code Refactoring
Summary
Adds storage lineage support to Hive and Hive Metastore connectors, enabling lineage tracking between Hive tables and their underlying storage locations (S3, Azure, GCS, HDFS, DBFS). Also refactors Hive sources into a clean directory structure.
Key Changes
Storage Lineage (Opt-in Feature)
New configuration options (disabled by default):
emit_storage_lineage: Enable storage lineage extractionhive_storage_lineage_direction: Set direction (upstreamordownstream)include_column_lineage: Enable column-level lineagestorage_platform_instance: Platform instance for storage URNsSupported platforms: S3, Azure (ADLS/ABFS), GCS, HDFS, DBFS, local files
Code Refactoring
hive.py→hive/hive_source.pyhive_metastore.py→hive/hive_metastore_source.pyhive/storage_lineage.pywith shared logicHiveStorageLineageConfigMixinto eliminate duplicationsetup.pyentry points to use fully qualified pathsCode Quality Improvements
HiveStorageLineageConfigto Pydantic modelLineageDirectionandStoragePlatformStrEnums for type safetyget_db_schema(now raisesValueErrorfor invalid input)get_workunits_internal(specific exceptions + proper logging)PR Checks