Skip to content

Commit

Permalink
fix: Allow BigQuery Usage Extractor to extract usage for views (#399)
Browse files Browse the repository at this point in the history
* Fixing BigQuery Usage Extractor to extract usage for views

Signed-off-by: Abhinay Kathuria <abhinay.kathuria@rea-group.com>

Signed-off-by: Abhinay Kathuria <abhinay@kathuria.com.au>

* Fix create records

Signed-off-by: Abhinay Kathuria <abhinay@kathuria.com.au>

* Fix Linting

Signed-off-by: Abhinay Kathuria <abhinay@kathuria.com.au>

Signed-off-by: Abhinay Kathuria <abhinay@kathuria.com.au>

* Add type annotations

Signed-off-by: Abhinay Kathuria <abhinay.kathuria@rea-group.com>
  • Loading branch information
abhinay04 authored Nov 3, 2020
1 parent 052ce5d commit 8779229
Showing 1 changed file with 44 additions and 30 deletions.
74 changes: 44 additions & 30 deletions databuilder/extractor/bigquery_usage_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from time import sleep

from pyhocon import ConfigTree
from typing import Any, Iterator, Dict, Optional, Tuple
from typing import Any, Iterator, Dict, Optional, Tuple, List

from databuilder.extractor.base_bigquery_extractor import BaseBigQueryExtractor

Expand Down Expand Up @@ -64,36 +64,50 @@ def _count_usage(self) -> None: # noqa: C901
continue

email = entry['protoPayload']['authenticationInfo']['principalEmail']
refTables = job['jobStatistics'].get('referencedTables', None)

if not refTables:
# Query results can be cached and if the source tables remain untouched,
# bigquery will return it from a 24 hour cache result instead. In that
# case, referencedTables has been observed to be empty:
# https://cloud.google.com/logging/docs/reference/audit/bigquery/rest/Shared.Types/AuditData#JobStatistics
continue
# Query results can be cached and if the source tables remain untouched,
# bigquery will return it from a 24 hour cache result instead. In that
# case, referencedTables has been observed to be empty:
# https://cloud.google.com/logging/docs/reference/audit/bigquery/rest/Shared.Types/AuditData#JobStatistics

# if email filter is provided, only the email matched with filter will be recorded.
if self.email_pattern:
if not re.match(self.email_pattern, email):
# the usage account not match email pattern
continue

numTablesProcessed = job['jobStatistics']['totalTablesProcessed']
if len(refTables) != numTablesProcessed:
LOGGER.warn('The number of tables listed in job {job_id} is not consistent'
.format(job_id=job['jobName']['jobId']))

for refTable in refTables:
key = TableColumnUsageTuple(database='bigquery',
cluster=refTable['projectId'],
schema=refTable['datasetId'],
table=refTable['tableId'],
column='*',
email=email)

new_count = self.table_usage_counts.get(key, 0) + 1
self.table_usage_counts[key] = new_count
refTables = job['jobStatistics'].get('referencedTables', None)
if refTables:
if 'totalTablesProcessed' in job['jobStatistics']:
self._create_records(
refTables,
job['jobStatistics']['totalTablesProcessed'], email,
job['jobName']['jobId'])

refViews = job['jobStatistics'].get('referencedViews', None)
if refViews:
if 'totalViewsProcessed' in job['jobStatistics']:
self._create_records(
refViews, job['jobStatistics']['totalViewsProcessed'],
email, job['jobName']['jobId'])

def _create_records(self, refResources: List[dict], resourcesProcessed: int, email: str,
jobId: str) -> None:
# if email filter is provided, only the email matched with filter will be recorded.
if self.email_pattern:
if not re.match(self.email_pattern, email):
# the usage account not match email pattern
return

if len(refResources) != resourcesProcessed:
LOGGER.warn(
'The number of tables listed in job {job_id} is not consistent'
.format(job_id=jobId))
return

for refResource in refResources:
key = TableColumnUsageTuple(database='bigquery',
cluster=refResource['projectId'],
schema=refResource['datasetId'],
table=refResource['tableId'],
column='*',
email=email)

new_count = self.table_usage_counts.get(key, 0) + 1
self.table_usage_counts[key] = new_count

def _retrieve_records(self) -> Iterator[Optional[Dict]]:
"""
Expand Down

0 comments on commit 8779229

Please sign in to comment.