From 2fb0d504363c1ea41365ae9e62cb346fc690e7af Mon Sep 17 00:00:00 2001 From: oravidov Date: Sun, 17 Oct 2021 13:00:49 +0300 Subject: [PATCH 1/4] Fixed potential issue where database name can be overridden by the result coming back from Snowflake --- lineage/bigquery_query_history.py | 23 ++++++++++------------- lineage/query_history.py | 9 ++++++--- lineage/query_history_factory.py | 7 ++++--- lineage/snowflake_query_history.py | 28 ++++++++++++++-------------- 4 files changed, 34 insertions(+), 33 deletions(-) diff --git a/lineage/bigquery_query_history.py b/lineage/bigquery_query_history.py index 34fc018ae..98c963ebc 100644 --- a/lineage/bigquery_query_history.py +++ b/lineage/bigquery_query_history.py @@ -30,10 +30,9 @@ class BigQueryQueryHistory(QueryHistory): INFO_SCHEMA_END_TIME_UP_TO_CURRENT_TIMESTAMP = 'CURRENT_TIMESTAMP()' INFO_SCHEMA_END_TIME_UP_TO_PARAMETER = '@end_time' - def __init__(self, con, should_export_query_history: bool = True, ignore_schema: bool = False, - dataset: str = None) -> None: - self.dataset = dataset - super().__init__(con, should_export_query_history, ignore_schema) + def __init__(self, con, profile_database_name: str, profile_schema_name: str, + should_export_query_history: bool = True, ignore_schema: bool = False) -> None: + super().__init__(con, profile_database_name, profile_schema_name, should_export_query_history, ignore_schema) @classmethod def _build_history_query(cls, start_date: datetime, end_date: datetime, database_name: str, location: str) -> \ @@ -59,13 +58,17 @@ def _build_history_query(cls, start_date: datetime, end_date: datetime, database def _query_history_table(self, start_date: datetime, end_date: datetime) -> [Query]: database_name = self.get_database_name() - query, query_parameters = self._build_history_query(start_date, end_date, database_name, self._con.location) + schema_name = self.get_schema_name() + logger.debug(f"Pulling BigQuery history from database - {database_name} and schema - {schema_name}") + + query_text, query_parameters = self._build_history_query(start_date, end_date, database_name, + self._con.location) job_config = bigquery.QueryJobConfig( query_parameters=query_parameters ) - job = self._con.query(query, job_config=job_config) + job = self._con.query(query_text, job_config=job_config) logger.debug("Finished executing bigquery jobs history query") @@ -82,19 +85,13 @@ def _query_history_table(self, start_date: datetime, end_date: datetime) -> [Que query = BigQueryQuery(raw_query_text=row[0], query_context=query_context, profile_database_name=database_name, - profile_schema_name=self.get_schema_name()) + profile_schema_name=schema_name) queries.append(query) logger.debug("Finished fetching bigquery history job results") return queries - def get_database_name(self) -> str: - return self._con.project - - def get_schema_name(self) -> Optional[str]: - return self.dataset if not self._ignore_schema else None - def properties(self) -> dict: return {'platform_type': 'bigquery', 'ignore_schema': self._ignore_schema} \ No newline at end of file diff --git a/lineage/query_history.py b/lineage/query_history.py index 900a0456f..d5de34439 100644 --- a/lineage/query_history.py +++ b/lineage/query_history.py @@ -15,8 +15,11 @@ class QueryHistory(object): LATEST_QUERY_HISTORY_FILE = './latest_query_history.json' - def __init__(self, con, should_export_query_history: bool = True, ignore_schema: bool = False) -> None: + def __init__(self, con, profile_database_name: str, profile_schema_name: str, + should_export_query_history: bool = True, ignore_schema: bool = False) -> None: self._con = con + self._profile_database_name = profile_database_name + self._profile_schema_name = profile_schema_name self._should_export_query_history = should_export_query_history self._ignore_schema = ignore_schema @@ -65,10 +68,10 @@ def _query_history_table(self, start_date: datetime, end_date: datetime) -> [Que pass def get_database_name(self) -> str: - pass + return self._profile_database_name def get_schema_name(self) -> Optional[str]: - pass + return self._profile_schema_name if not self._ignore_schema else None def properties(self) -> dict: pass diff --git a/lineage/query_history_factory.py b/lineage/query_history_factory.py index fe8e44263..488e63d80 100644 --- a/lineage/query_history_factory.py +++ b/lineage/query_history_factory.py @@ -32,11 +32,12 @@ def create_query_history(self, credentials: Any, profile_data: dict) -> QueryHis **credentials.auth_args() ) - return SnowflakeQueryHistory(snowflake_con, self._export_query_history, self._ignore_schema, + return SnowflakeQueryHistory(snowflake_con, credentials.database, credentials.schema, + self._export_query_history, self._ignore_schema, profile_data.get('query_history_source')) elif credentials_type == 'bigquery': bigquery_client = get_bigquery_client(credentials) - return BigQueryQueryHistory(bigquery_client, self._export_query_history, self._ignore_schema, - profile_data.get('dataset')) + return BigQueryQueryHistory(bigquery_client, credentials.database, credentials.schema, + self._export_query_history, self._ignore_schema) else: raise ConfigError("Unsupported profile type") diff --git a/lineage/snowflake_query_history.py b/lineage/snowflake_query_history.py index 34becbba7..b70ba716f 100644 --- a/lineage/snowflake_query_history.py +++ b/lineage/snowflake_query_history.py @@ -50,10 +50,11 @@ class SnowflakeQueryHistory(QueryHistory): ACCOUNT_USAGE_END_TIME_UP_TO_PARAMETER = 'end_time <= :3' QUERY_HISTORY_SOURCE_ACCOUNT_USAGE = 'account_usage' - def __init__(self, con, should_export_query_history: bool = True, ignore_schema: bool = False, + def __init__(self, con, profile_database_name: str, profile_schema_name: str, + should_export_query_history: bool = True, ignore_schema: bool = False, query_history_source: str = None) -> None: self.query_history_source = query_history_source.strip().lower() if query_history_source is not None else None - super().__init__(con, should_export_query_history, ignore_schema) + super().__init__(con, profile_database_name, profile_schema_name, should_export_query_history, ignore_schema) @classmethod def _build_history_query(cls, start_date: datetime, end_date: datetime, database_name: str, @@ -87,10 +88,15 @@ def _build_history_query(cls, start_date: datetime, end_date: datetime, database def _query_history_table(self, start_date: datetime, end_date: datetime) -> [Query]: queries = [] + database_name = self.get_database_name() + schema_name = self.get_schema_name() + + logger.debug(f"Pulling snowflake query history from database - {database_name} and schema - {schema_name}") + with self._con.cursor() as cursor: - query, bindings = self._build_history_query(start_date, end_date, self.get_database_name(), - self.query_history_source) - cursor.execute(query, bindings) + query_text, bindings = self._build_history_query(start_date, end_date, database_name, + self.query_history_source) + cursor.execute(query_text, bindings) logger.debug("Finished executing snowflake history query") rows = cursor.fetchall() for row in rows: @@ -104,21 +110,15 @@ def _query_history_table(self, start_date: datetime, end_date: datetime) -> [Que query = SnowflakeQuery(raw_query_text=row[0], query_context=query_context, - profile_database_name=self.get_database_name(), - profile_schema_name=self.get_schema_name()) + profile_database_name=database_name, + profile_schema_name=schema_name) queries.append(query) logger.debug("Finished fetching snowflake history query results") return queries - def get_database_name(self) -> str: - return self._con.database - - def get_schema_name(self) -> Optional[str]: - return self._con.schema if not self._ignore_schema else None - def properties(self) -> dict: return {'platform_type': 'snowflake', 'query_history_source': self.query_history_source, - 'ignore_schema': self._ignore_schema} \ No newline at end of file + 'ignore_schema': self._ignore_schema} From f7ce2daeac5ab7860ac8907e9c8543fcd8a94f5b Mon Sep 17 00:00:00 2001 From: oravidov Date: Sun, 17 Oct 2021 13:07:57 +0300 Subject: [PATCH 2/4] Added run id to the anonymous tracking module --- lineage/tracking.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/lineage/tracking.py b/lineage/tracking.py index 69acbcba9..1d29fe8aa 100644 --- a/lineage/tracking.py +++ b/lineage/tracking.py @@ -16,6 +16,7 @@ def __init__(self, profiles_dir: str, anonymous_usage_tracking: bool = True) -> self.api_key = None self.url = None self.do_not_track = anonymous_usage_tracking is False + self.run_id = str(uuid.uuid4()) def init(self): try: @@ -55,6 +56,11 @@ def send_event(self, name: str, properties: dict = None) -> None: if self.api_key is None or self.url is None or self.anonymous_user_id is None: return + if properties is None: + properties = dict() + + properties['run_id'] = self.run_id + posthog.api_key = self.api_key posthog.host = self.url posthog.capture(distinct_id=self.anonymous_user_id, event=name, properties=properties) From 3f8bdb40f3a60b68d67b8985c87837707f9ec85a Mon Sep 17 00:00:00 2001 From: oravidov Date: Sun, 17 Oct 2021 15:31:02 +0300 Subject: [PATCH 3/4] Fixed bug with table filter and added run properties to the anonymous tracking --- lineage/main.py | 15 +++++-------- lineage/tracking.py | 54 +++++++++++++++++++++++++++++++++------------ lineage/utils.py | 42 +++++++++++++++++++++++++++++++++++ 3 files changed, 87 insertions(+), 24 deletions(-) diff --git a/lineage/main.py b/lineage/main.py index e4e0d5a52..eb0db8659 100644 --- a/lineage/main.py +++ b/lineage/main.py @@ -1,7 +1,7 @@ import click from lineage.dbt_utils import extract_credentials_and_data_from_profiles -from lineage.tracking import AnonymousTracking +from lineage.tracking import track_cli_start, track_cli_end from lineage.exceptions import ConfigError from pyfiglet import Figlet from datetime import timedelta, date @@ -135,9 +135,7 @@ def main(start_date: datetime, end_date: datetime, profiles_dir: str, profile_na credentials, profile_data = extract_credentials_and_data_from_profiles(profiles_dir, profile_name) - anonymous_tracking = AnonymousTracking(profiles_dir, profile_data.get('anonymous_usage_tracking')) - anonymous_tracking.init() - anonymous_tracking.send_event('cli-start') + anonymous_tracking = track_cli_start(profiles_dir, profile_data) query_history = QueryHistoryFactory(export_query_history, ignore_schema).create_query_history(credentials, profile_data) @@ -148,8 +146,8 @@ def main(start_date: datetime, end_date: datetime, profiles_dir: str, profile_na lineage_graph.init_graph_from_query_list(queries) if table is not None: - table_resolver = TableResolver(profile_database_name=query_history.get_database_name(), - profile_schema_name=query_history.get_schema_name(), + table_resolver = TableResolver(profile_database_name=credentials.database, + profile_schema_name=credentials.schema, full_table_names=full_table_names) resolved_table_name = table_resolver.name_qualification(table) if resolved_table_name is None: @@ -159,10 +157,7 @@ def main(start_date: datetime, end_date: datetime, profiles_dir: str, profile_na lineage_graph.draw_graph(should_open_browser=open_browser) - lineage_properties = lineage_graph.properties() - lineage_properties.update(query_history.properties()) - - anonymous_tracking.send_event('cli-end', properties=lineage_properties) + track_cli_end(anonymous_tracking, lineage_graph.properties(), query_history.properties()) if __name__ == "__main__": diff --git a/lineage/tracking.py b/lineage/tracking.py index 1d29fe8aa..bc4927ebf 100644 --- a/lineage/tracking.py +++ b/lineage/tracking.py @@ -1,10 +1,12 @@ import uuid import os -from typing import Union +from typing import Union, Optional import requests from bs4 import BeautifulSoup import posthog +from lineage.utils import get_run_properties + class AnonymousTracking(object): ANONYMOUS_USER_ID_FILE = '.user_id' @@ -19,19 +21,16 @@ def __init__(self, profiles_dir: str, anonymous_usage_tracking: bool = True) -> self.run_id = str(uuid.uuid4()) def init(self): - try: - anonymous_user_id_file_name = os.path.join(self.profiles_dir, self.ANONYMOUS_USER_ID_FILE) - if os.path.exists(anonymous_user_id_file_name): - with open(anonymous_user_id_file_name, 'r') as anonymous_user_id_file: - self.anonymous_user_id = anonymous_user_id_file.read() - else: - self.anonymous_user_id = str(uuid.uuid4()) - with open(anonymous_user_id_file_name, 'w') as anonymous_user_id_file: - anonymous_user_id_file.write(self.anonymous_user_id) - - self.api_key, self.url = self._fetch_api_key_and_url() - except Exception: - pass + anonymous_user_id_file_name = os.path.join(self.profiles_dir, self.ANONYMOUS_USER_ID_FILE) + if os.path.exists(anonymous_user_id_file_name): + with open(anonymous_user_id_file_name, 'r') as anonymous_user_id_file: + self.anonymous_user_id = anonymous_user_id_file.read() + else: + self.anonymous_user_id = str(uuid.uuid4()) + with open(anonymous_user_id_file_name, 'w') as anonymous_user_id_file: + anonymous_user_id_file.write(self.anonymous_user_id) + + self.api_key, self.url = self._fetch_api_key_and_url() @classmethod def _fetch_api_key_and_url(cls) -> (Union[str, None], Union[str, None]): @@ -64,3 +63,30 @@ def send_event(self, name: str, properties: dict = None) -> None: posthog.api_key = self.api_key posthog.host = self.url posthog.capture(distinct_id=self.anonymous_user_id, event=name, properties=properties) + + +def track_cli_start(profiles_dir: str, profile_data: dict) -> Optional['AnonymousTracking']: + try: + anonymous_tracking = AnonymousTracking(profiles_dir, profile_data.get('anonymous_usage_tracking')) + anonymous_tracking.init() + cli_start_properties = dict() + cli_start_properties.update(get_run_properties()) + anonymous_tracking.send_event('cli-start', properties=cli_start_properties) + return anonymous_tracking + except Exception: + pass + return None + + +def track_cli_end(anonymous_tracking: AnonymousTracking, lineage_properties: dict, query_history_properties: dict) \ + -> None: + try: + if anonymous_tracking is None: + return + + cli_end_properties = dict() + cli_end_properties.update(lineage_properties) + cli_end_properties.update(query_history_properties) + anonymous_tracking.send_event('cli-end', properties=cli_end_properties) + except Exception: + pass diff --git a/lineage/utils.py b/lineage/utils.py index bf63ada04..ec5e57976 100644 --- a/lineage/utils.py +++ b/lineage/utils.py @@ -1,9 +1,12 @@ import os import logging import sys +from pathlib import Path +import click FORMATTER = logging.Formatter("%(asctime)s — %(name)s — %(levelname)s — %(message)s") LOG_FILE = "edl.log" +DBT_DIR = ".dbt" def is_flight_mode_on() -> bool: @@ -43,3 +46,42 @@ def get_logger(logger_name): # with this pattern, it's rarely necessary to propagate the error up to parent logger.propagate = False return logger + + +def is_dbt_installed() -> bool: + if os.path.exists(os.path.join(Path.home(), DBT_DIR)): + return True + return False + + +def get_run_properties() -> dict: + + click_context = click.get_current_context() + if click_context is None: + return dict() + + params = click_context.params + if params is None: + return dict() + + start_date = params.get('start_date') + end_date = params.get('end_date') + is_filtered = params.get('table') is not None + + start_date_str = None + if start_date is not None: + start_date_str = start_date.isoformat() + + end_date_str = None + if end_date is not None: + end_date_str = end_date.isoformat() + + return {'start_date': start_date_str, + 'end_date': end_date_str, + 'is_filtered': is_filtered, + 'open_browser': params.get('open_browser'), + 'export_query_history': params.get('export_query_history'), + 'full_table_names': params.get('full_table_names'), + 'direction': params.get('direction'), + 'depth': params.get('depth'), + 'dbt_installed': is_dbt_installed()} From b188ba06d302fbf2fd36464e55b508aa76654426 Mon Sep 17 00:00:00 2001 From: oravidov Date: Sun, 17 Oct 2021 15:40:41 +0300 Subject: [PATCH 4/4] Bumped version --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 5f0bb78c6..8babf972c 100644 --- a/setup.py +++ b/setup.py @@ -20,7 +20,7 @@ setup( name='elementary-lineage', description='Presenting data lineage based on your data warehouse query history', - version='0.0.14', + version='0.0.15', packages=find_packages(), python_requires='>=3.6.2', entry_points='''