Skip to content
This repository has been archived by the owner on Aug 2, 2024. It is now read-only.

Commit

Permalink
sqllineage support UNION with SELECT [sc-16158] (#11)
Browse files Browse the repository at this point in the history
* sqllineage support UNION with SELECT [sc-16158]

* fix format
  • Loading branch information
alyiwang authored Mar 14, 2023
1 parent 5505409 commit 7f95837
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 13 deletions.
2 changes: 1 addition & 1 deletion sqllineage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def _monkey_patch() -> None:
_monkey_patch()

NAME = "metaphor-sqllineage"
VERSION = "2.0.7"
VERSION = "2.0.8"
DEFAULT_LOGGING = {
"version": 1,
"disable_existing_loggers": False,
Expand Down
33 changes: 28 additions & 5 deletions sqllineage/core/analyzer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from functools import reduce
from operator import add
from typing import List, NamedTuple, Optional, Set, Union
from typing import Dict, List, NamedTuple, Optional, Set, Union

from sqlparse.sql import (
Function,
Expand All @@ -13,8 +13,9 @@

from sqllineage.core.handlers.base import CurrentTokenBaseHandler, NextTokenBaseHandler
from sqllineage.core.holders import StatementLineageHolder, SubQueryLineageHolder
from sqllineage.core.models import SubQuery, Table, TableMetadata
from sqllineage.core.models import Column, SubQuery, Table, TableMetadata
from sqllineage.exceptions import SQLLineageException
from sqllineage.utils.constant import EdgeType
from sqllineage.utils.sqlparse import (
get_subquery_parentheses,
is_subquery,
Expand All @@ -24,7 +25,7 @@

class AnalyzerContext(NamedTuple):
subquery: Optional[SubQuery] = None
prev_cte: Optional[Set[SubQuery]] = None
prev_cte: Optional[Dict[SubQuery, Set[Column]]] = None


class LineageAnalyzer:
Expand Down Expand Up @@ -108,11 +109,15 @@ def _extract_from_dml(
holder = SubQueryLineageHolder()
if context.prev_cte is not None:
# CTE can be referenced by subsequent CTEs
for cte in context.prev_cte:
for cte, columns in context.prev_cte.items():
holder.add_cte(cte)
for column in columns:
holder.add_table_has_column(column)

if context.subquery is not None:
# If within subquery, then manually add subquery as target table
holder.add_write(context.subquery)

current_handlers = [
handler_cls() for handler_cls in CurrentTokenBaseHandler.__subclasses__()
]
Expand Down Expand Up @@ -152,8 +157,9 @@ def _extract_from_dml(

# recursively extracting each subquery of the parent and merge
for sq in subqueries:
prev_cte = cls._find_cte_columns(holder)
sq_holder = cls._extract_from_dml(
sq.token, AnalyzerContext(sq, holder.cte), metadata
sq.token, AnalyzerContext(sq, prev_cte), metadata
)
holder |= sq_holder

Expand All @@ -162,6 +168,23 @@ def _extract_from_dml(

return holder

@classmethod
def _find_cte_columns(
cls,
holder: SubQueryLineageHolder,
) -> Dict[SubQuery, Set[Column]]:
"""
Finds the previous CTEs and their columns from the holder graph
"""
mapping: Dict[SubQuery, Set[Column]] = {}
for src, tgt, attr in holder.graph.edges(data=True):
if attr.get("type") == EdgeType.HAS_COLUMN and src in holder.cte:
if src not in mapping:
mapping[src] = set()
mapping[src].add(tgt)

return mapping

@classmethod
def parse_subquery(cls, token: TokenList) -> List[SubQuery]:
result = []
Expand Down
11 changes: 7 additions & 4 deletions sqllineage/core/holders.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,15 @@ def cte(self) -> Set[SubQuery]:
def add_cte(self, value) -> None:
self._property_setter(value, NodeTag.CTE)

def add_table_has_column(self, col: Column) -> None:
if col.parent is not None:
# starting NetworkX v2.6, None is not allowed as node, see https://github.com/networkx/networkx/pull/4892
self.graph.add_edge(col.parent, col, type=EdgeType.HAS_COLUMN)

def add_column_lineage(self, src: Column, tgt: Column) -> None:
self.graph.add_edge(src, tgt, type=EdgeType.LINEAGE)
self.graph.add_edge(tgt.parent, tgt, type=EdgeType.HAS_COLUMN)
if src.parent is not None:
# starting NetworkX v2.6, None is not allowed as node, see https://github.com/networkx/networkx/pull/4892
self.graph.add_edge(src.parent, src, type=EdgeType.HAS_COLUMN)
self.add_table_has_column(src)
self.add_table_has_column(tgt)


class StatementLineageHolder(SubQueryLineageHolder, ColumnLineageMixin):
Expand Down
40 changes: 40 additions & 0 deletions tests/test_columns.py
Original file line number Diff line number Diff line change
Expand Up @@ -1065,3 +1065,43 @@ def test_column_star_with_schema():
"sch",
DummySchemaFetcher({"db.sch.tab2": ["a1", "a2"]}),
)


def test_union():
sql = """
with ss as (
select id, total
from store_sales),
ws as (
select id, total
from web_sales)
insert overwrite table tab1
select id, total
from (select *
from ss
union all
select *
from ws) tmp1
"""

assert_column_lineage_equal(
sql,
[
(
ColumnQualifierTuple("id", "store_sales"),
ColumnQualifierTuple("id", "tab1"),
),
(
ColumnQualifierTuple("id", "web_sales"),
ColumnQualifierTuple("id", "tab1"),
),
(
ColumnQualifierTuple("total", "store_sales"),
ColumnQualifierTuple("total", "tab1"),
),
(
ColumnQualifierTuple("total", "web_sales"),
ColumnQualifierTuple("total", "tab1"),
),
],
)
58 changes: 55 additions & 3 deletions tests/test_tpcds.py
Original file line number Diff line number Diff line change
Expand Up @@ -1213,7 +1213,24 @@
"catalog_sales",
},
{"query33"},
[],
[
(
ColumnQualifierTuple("i_manufact_id", "item"),
ColumnQualifierTuple("i_manufact_id", "query33"),
),
(
ColumnQualifierTuple("ss_ext_sales_price", None),
ColumnQualifierTuple("total_sales", "query33"),
),
(
ColumnQualifierTuple("cs_ext_sales_price", None),
ColumnQualifierTuple("total_sales", "query33"),
),
(
ColumnQualifierTuple("ws_ext_sales_price", None),
ColumnQualifierTuple("total_sales", "query33"),
),
],
{},
),
34: (
Expand Down Expand Up @@ -2096,6 +2113,7 @@
),
54: (
{
"my_customers",
"customer_address",
"web_sales",
"item",
Expand Down Expand Up @@ -2138,7 +2156,24 @@
"catalog_sales",
},
{"query56"},
[],
[
(
ColumnQualifierTuple("i_item_id", "item"),
ColumnQualifierTuple("i_item_id", "query56"),
),
(
ColumnQualifierTuple("ss_ext_sales_price", None),
ColumnQualifierTuple("total_sales", "query56"),
),
(
ColumnQualifierTuple("cs_ext_sales_price", None),
ColumnQualifierTuple("total_sales", "query56"),
),
(
ColumnQualifierTuple("ws_ext_sales_price", None),
ColumnQualifierTuple("total_sales", "query56"),
),
],
{},
),
57: (
Expand Down Expand Up @@ -2356,7 +2391,24 @@
"catalog_sales",
},
{"query60"},
[],
[
(
ColumnQualifierTuple("ss_ext_sales_price", None),
ColumnQualifierTuple("total_sales", "query60"),
),
(
ColumnQualifierTuple("cs_ext_sales_price", None),
ColumnQualifierTuple("total_sales", "query60"),
),
(
ColumnQualifierTuple("i_item_id", "item"),
ColumnQualifierTuple("i_item_id", "query60"),
),
(
ColumnQualifierTuple("ws_ext_sales_price", None),
ColumnQualifierTuple("total_sales", "query60"),
),
],
{},
),
61: (
Expand Down

0 comments on commit 7f95837

Please sign in to comment.