Skip to content

Commit

Permalink
Merge pull request #17 from elementary-data/small_fixes
Browse files Browse the repository at this point in the history
Small fixes
  • Loading branch information
oravi authored Oct 17, 2021
2 parents 77a20c6 + b188ba0 commit 2ae0b50
Show file tree
Hide file tree
Showing 8 changed files with 128 additions and 58 deletions.
23 changes: 10 additions & 13 deletions lineage/bigquery_query_history.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,9 @@ class BigQueryQueryHistory(QueryHistory):
INFO_SCHEMA_END_TIME_UP_TO_CURRENT_TIMESTAMP = 'CURRENT_TIMESTAMP()'
INFO_SCHEMA_END_TIME_UP_TO_PARAMETER = '@end_time'

def __init__(self, con, should_export_query_history: bool = True, ignore_schema: bool = False,
dataset: str = None) -> None:
self.dataset = dataset
super().__init__(con, should_export_query_history, ignore_schema)
def __init__(self, con, profile_database_name: str, profile_schema_name: str,
should_export_query_history: bool = True, ignore_schema: bool = False) -> None:
super().__init__(con, profile_database_name, profile_schema_name, should_export_query_history, ignore_schema)

@classmethod
def _build_history_query(cls, start_date: datetime, end_date: datetime, database_name: str, location: str) -> \
Expand All @@ -59,13 +58,17 @@ def _build_history_query(cls, start_date: datetime, end_date: datetime, database

def _query_history_table(self, start_date: datetime, end_date: datetime) -> [Query]:
database_name = self.get_database_name()
query, query_parameters = self._build_history_query(start_date, end_date, database_name, self._con.location)
schema_name = self.get_schema_name()
logger.debug(f"Pulling BigQuery history from database - {database_name} and schema - {schema_name}")

query_text, query_parameters = self._build_history_query(start_date, end_date, database_name,
self._con.location)

job_config = bigquery.QueryJobConfig(
query_parameters=query_parameters
)

job = self._con.query(query, job_config=job_config)
job = self._con.query(query_text, job_config=job_config)

logger.debug("Finished executing bigquery jobs history query")

Expand All @@ -82,19 +85,13 @@ def _query_history_table(self, start_date: datetime, end_date: datetime) -> [Que
query = BigQueryQuery(raw_query_text=row[0],
query_context=query_context,
profile_database_name=database_name,
profile_schema_name=self.get_schema_name())
profile_schema_name=schema_name)

queries.append(query)
logger.debug("Finished fetching bigquery history job results")

return queries

def get_database_name(self) -> str:
return self._con.project

def get_schema_name(self) -> Optional[str]:
return self.dataset if not self._ignore_schema else None

def properties(self) -> dict:
return {'platform_type': 'bigquery',
'ignore_schema': self._ignore_schema}
15 changes: 5 additions & 10 deletions lineage/main.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import click

from lineage.dbt_utils import extract_credentials_and_data_from_profiles
from lineage.tracking import AnonymousTracking
from lineage.tracking import track_cli_start, track_cli_end
from lineage.exceptions import ConfigError
from pyfiglet import Figlet
from datetime import timedelta, date
Expand Down Expand Up @@ -135,9 +135,7 @@ def main(start_date: datetime, end_date: datetime, profiles_dir: str, profile_na

credentials, profile_data = extract_credentials_and_data_from_profiles(profiles_dir, profile_name)

anonymous_tracking = AnonymousTracking(profiles_dir, profile_data.get('anonymous_usage_tracking'))
anonymous_tracking.init()
anonymous_tracking.send_event('cli-start')
anonymous_tracking = track_cli_start(profiles_dir, profile_data)

query_history = QueryHistoryFactory(export_query_history, ignore_schema).create_query_history(credentials,
profile_data)
Expand All @@ -148,8 +146,8 @@ def main(start_date: datetime, end_date: datetime, profiles_dir: str, profile_na
lineage_graph.init_graph_from_query_list(queries)

if table is not None:
table_resolver = TableResolver(profile_database_name=query_history.get_database_name(),
profile_schema_name=query_history.get_schema_name(),
table_resolver = TableResolver(profile_database_name=credentials.database,
profile_schema_name=credentials.schema,
full_table_names=full_table_names)
resolved_table_name = table_resolver.name_qualification(table)
if resolved_table_name is None:
Expand All @@ -159,10 +157,7 @@ def main(start_date: datetime, end_date: datetime, profiles_dir: str, profile_na

lineage_graph.draw_graph(should_open_browser=open_browser)

lineage_properties = lineage_graph.properties()
lineage_properties.update(query_history.properties())

anonymous_tracking.send_event('cli-end', properties=lineage_properties)
track_cli_end(anonymous_tracking, lineage_graph.properties(), query_history.properties())


if __name__ == "__main__":
Expand Down
9 changes: 6 additions & 3 deletions lineage/query_history.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@ class QueryHistory(object):

LATEST_QUERY_HISTORY_FILE = './latest_query_history.json'

def __init__(self, con, should_export_query_history: bool = True, ignore_schema: bool = False) -> None:
def __init__(self, con, profile_database_name: str, profile_schema_name: str,
should_export_query_history: bool = True, ignore_schema: bool = False) -> None:
self._con = con
self._profile_database_name = profile_database_name
self._profile_schema_name = profile_schema_name
self._should_export_query_history = should_export_query_history
self._ignore_schema = ignore_schema

Expand Down Expand Up @@ -65,10 +68,10 @@ def _query_history_table(self, start_date: datetime, end_date: datetime) -> [Que
pass

def get_database_name(self) -> str:
pass
return self._profile_database_name

def get_schema_name(self) -> Optional[str]:
pass
return self._profile_schema_name if not self._ignore_schema else None

def properties(self) -> dict:
pass
7 changes: 4 additions & 3 deletions lineage/query_history_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@ def create_query_history(self, credentials: Any, profile_data: dict) -> QueryHis
**credentials.auth_args()
)

return SnowflakeQueryHistory(snowflake_con, self._export_query_history, self._ignore_schema,
return SnowflakeQueryHistory(snowflake_con, credentials.database, credentials.schema,
self._export_query_history, self._ignore_schema,
profile_data.get('query_history_source'))
elif credentials_type == 'bigquery':
bigquery_client = get_bigquery_client(credentials)
return BigQueryQueryHistory(bigquery_client, self._export_query_history, self._ignore_schema,
profile_data.get('dataset'))
return BigQueryQueryHistory(bigquery_client, credentials.database, credentials.schema,
self._export_query_history, self._ignore_schema)
else:
raise ConfigError("Unsupported profile type")
28 changes: 14 additions & 14 deletions lineage/snowflake_query_history.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,11 @@ class SnowflakeQueryHistory(QueryHistory):
ACCOUNT_USAGE_END_TIME_UP_TO_PARAMETER = 'end_time <= :3'
QUERY_HISTORY_SOURCE_ACCOUNT_USAGE = 'account_usage'

def __init__(self, con, should_export_query_history: bool = True, ignore_schema: bool = False,
def __init__(self, con, profile_database_name: str, profile_schema_name: str,
should_export_query_history: bool = True, ignore_schema: bool = False,
query_history_source: str = None) -> None:
self.query_history_source = query_history_source.strip().lower() if query_history_source is not None else None
super().__init__(con, should_export_query_history, ignore_schema)
super().__init__(con, profile_database_name, profile_schema_name, should_export_query_history, ignore_schema)

@classmethod
def _build_history_query(cls, start_date: datetime, end_date: datetime, database_name: str,
Expand Down Expand Up @@ -87,10 +88,15 @@ def _build_history_query(cls, start_date: datetime, end_date: datetime, database

def _query_history_table(self, start_date: datetime, end_date: datetime) -> [Query]:
queries = []
database_name = self.get_database_name()
schema_name = self.get_schema_name()

logger.debug(f"Pulling snowflake query history from database - {database_name} and schema - {schema_name}")

with self._con.cursor() as cursor:
query, bindings = self._build_history_query(start_date, end_date, self.get_database_name(),
self.query_history_source)
cursor.execute(query, bindings)
query_text, bindings = self._build_history_query(start_date, end_date, database_name,
self.query_history_source)
cursor.execute(query_text, bindings)
logger.debug("Finished executing snowflake history query")
rows = cursor.fetchall()
for row in rows:
Expand All @@ -104,21 +110,15 @@ def _query_history_table(self, start_date: datetime, end_date: datetime) -> [Que

query = SnowflakeQuery(raw_query_text=row[0],
query_context=query_context,
profile_database_name=self.get_database_name(),
profile_schema_name=self.get_schema_name())
profile_database_name=database_name,
profile_schema_name=schema_name)

queries.append(query)
logger.debug("Finished fetching snowflake history query results")

return queries

def get_database_name(self) -> str:
return self._con.database

def get_schema_name(self) -> Optional[str]:
return self._con.schema if not self._ignore_schema else None

def properties(self) -> dict:
return {'platform_type': 'snowflake',
'query_history_source': self.query_history_source,
'ignore_schema': self._ignore_schema}
'ignore_schema': self._ignore_schema}
60 changes: 46 additions & 14 deletions lineage/tracking.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import uuid
import os
from typing import Union
from typing import Union, Optional
import requests
from bs4 import BeautifulSoup
import posthog

from lineage.utils import get_run_properties


class AnonymousTracking(object):
ANONYMOUS_USER_ID_FILE = '.user_id'
Expand All @@ -16,21 +18,19 @@ def __init__(self, profiles_dir: str, anonymous_usage_tracking: bool = True) ->
self.api_key = None
self.url = None
self.do_not_track = anonymous_usage_tracking is False
self.run_id = str(uuid.uuid4())

def init(self):
try:
anonymous_user_id_file_name = os.path.join(self.profiles_dir, self.ANONYMOUS_USER_ID_FILE)
if os.path.exists(anonymous_user_id_file_name):
with open(anonymous_user_id_file_name, 'r') as anonymous_user_id_file:
self.anonymous_user_id = anonymous_user_id_file.read()
else:
self.anonymous_user_id = str(uuid.uuid4())
with open(anonymous_user_id_file_name, 'w') as anonymous_user_id_file:
anonymous_user_id_file.write(self.anonymous_user_id)

self.api_key, self.url = self._fetch_api_key_and_url()
except Exception:
pass
anonymous_user_id_file_name = os.path.join(self.profiles_dir, self.ANONYMOUS_USER_ID_FILE)
if os.path.exists(anonymous_user_id_file_name):
with open(anonymous_user_id_file_name, 'r') as anonymous_user_id_file:
self.anonymous_user_id = anonymous_user_id_file.read()
else:
self.anonymous_user_id = str(uuid.uuid4())
with open(anonymous_user_id_file_name, 'w') as anonymous_user_id_file:
anonymous_user_id_file.write(self.anonymous_user_id)

self.api_key, self.url = self._fetch_api_key_and_url()

@classmethod
def _fetch_api_key_and_url(cls) -> (Union[str, None], Union[str, None]):
Expand All @@ -55,6 +55,38 @@ def send_event(self, name: str, properties: dict = None) -> None:
if self.api_key is None or self.url is None or self.anonymous_user_id is None:
return

if properties is None:
properties = dict()

properties['run_id'] = self.run_id

posthog.api_key = self.api_key
posthog.host = self.url
posthog.capture(distinct_id=self.anonymous_user_id, event=name, properties=properties)


def track_cli_start(profiles_dir: str, profile_data: dict) -> Optional['AnonymousTracking']:
try:
anonymous_tracking = AnonymousTracking(profiles_dir, profile_data.get('anonymous_usage_tracking'))
anonymous_tracking.init()
cli_start_properties = dict()
cli_start_properties.update(get_run_properties())
anonymous_tracking.send_event('cli-start', properties=cli_start_properties)
return anonymous_tracking
except Exception:
pass
return None


def track_cli_end(anonymous_tracking: AnonymousTracking, lineage_properties: dict, query_history_properties: dict) \
-> None:
try:
if anonymous_tracking is None:
return

cli_end_properties = dict()
cli_end_properties.update(lineage_properties)
cli_end_properties.update(query_history_properties)
anonymous_tracking.send_event('cli-end', properties=cli_end_properties)
except Exception:
pass
42 changes: 42 additions & 0 deletions lineage/utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import os
import logging
import sys
from pathlib import Path
import click

FORMATTER = logging.Formatter("%(asctime)s — %(name)s — %(levelname)s — %(message)s")
LOG_FILE = "edl.log"
DBT_DIR = ".dbt"


def is_flight_mode_on() -> bool:
Expand Down Expand Up @@ -43,3 +46,42 @@ def get_logger(logger_name):
# with this pattern, it's rarely necessary to propagate the error up to parent
logger.propagate = False
return logger


def is_dbt_installed() -> bool:
if os.path.exists(os.path.join(Path.home(), DBT_DIR)):
return True
return False


def get_run_properties() -> dict:

click_context = click.get_current_context()
if click_context is None:
return dict()

params = click_context.params
if params is None:
return dict()

start_date = params.get('start_date')
end_date = params.get('end_date')
is_filtered = params.get('table') is not None

start_date_str = None
if start_date is not None:
start_date_str = start_date.isoformat()

end_date_str = None
if end_date is not None:
end_date_str = end_date.isoformat()

return {'start_date': start_date_str,
'end_date': end_date_str,
'is_filtered': is_filtered,
'open_browser': params.get('open_browser'),
'export_query_history': params.get('export_query_history'),
'full_table_names': params.get('full_table_names'),
'direction': params.get('direction'),
'depth': params.get('depth'),
'dbt_installed': is_dbt_installed()}
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
setup(
name='elementary-lineage',
description='Presenting data lineage based on your data warehouse query history',
version='0.0.14',
version='0.0.15',
packages=find_packages(),
python_requires='>=3.6.2',
entry_points='''
Expand Down

0 comments on commit 2ae0b50

Please sign in to comment.