Skip to content

Commit 420c82c

Browse files
authored
BQ write and query jobs labeling (#381)
* BQ write and query jobs labeling * Fixed failing test * QueryJobConfig.labels instead of job_labels * Changelog and docs
1 parent fcd1226 commit 420c82c

File tree

6 files changed

+67
-19
lines changed

6 files changed

+67
-19
lines changed

CHANGELOG.md

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,20 @@
11
# BigFlow changelog
22

3+
## Version 1.10.0
4+
5+
### Added
6+
7+
* BigQuery query job labeling for collect and write operations. Labels are passed via `job_labels` dict argument in `DatasetConfiguration` and `DatasetManager`.
8+
39
## Version 1.9.0
410

5-
### Changes
11+
### Changed
612

713
* Switched from Google Container Registry to Artifact Registry. Made `-r`/`--docker-repository` common for all deploy commands. Build and deploy commands authenticate to the Docker repository taken from `deployment_config.py` or CLI arguments, instead of hardcoded `https://eu.gcr.io`.
814

915
## Version 1.8.0
1016

11-
### Changes
17+
### Changed
1218

1319
* Bumped basic dependencies: Apache Beam 2.48.0, google-cloud-bigtable 2.17.0, google-cloud-language 2.10.0, google-cloud-storage 2.11.2, among others (#374).
1420
* Added the `env_variable` argument to `bigflow.Workflow` which enables to change a name of the variable used to obtain environment name (#365).

bigflow/bigquery/dataset_configuration.py

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from typing import Dict, List
1+
from typing import Dict
22

33
from ..configuration import Config
44
from .interface import Dataset
@@ -16,14 +16,16 @@ def __init__(self,
1616
is_master: bool = True,
1717
is_default: bool = True,
1818
tables_labels: Dict[str, Dict[str, str]] = None,
19-
dataset_labels: Dict[str, str] = None):
19+
dataset_labels: Dict[str, str] = None,
20+
job_labels: Dict[str, str] = None):
2021
all_properties = (properties or {}).copy()
2122
all_properties['project_id'] = project_id
2223
all_properties['dataset_name'] = dataset_name
2324
all_properties['internal_tables'] = internal_tables or []
2425
all_properties['external_tables'] = external_tables or {}
2526
all_properties['tables_labels'] = tables_labels or []
2627
all_properties['dataset_labels'] = dataset_labels or []
28+
all_properties['job_labels'] = job_labels or []
2729

2830
self.delegate = Config(name=env, properties=all_properties, is_master=is_master, is_default=is_default)
2931

@@ -36,7 +38,8 @@ def add_configuration(self,
3638
properties: dict = None,
3739
is_default: bool = False,
3840
tables_labels: Dict[str, Dict[str, str]] = None,
39-
dataset_labels: Dict[str, str] = None):
41+
dataset_labels: Dict[str, str] = None,
42+
job_labels: Dict[str, str] = None):
4043

4144
all_properties = (properties or {}).copy()
4245

@@ -57,6 +60,9 @@ def add_configuration(self,
5760
if dataset_labels:
5861
all_properties['dataset_labels'] = dataset_labels
5962

63+
if job_labels:
64+
all_properties['job_labels'] = job_labels
65+
6066
self.delegate.add_configuration(env, all_properties, is_default=is_default)
6167
return self
6268

@@ -68,7 +74,8 @@ def create_dataset_manager(self, env: str = None) -> Dataset:
6874
external_tables=self.resolve_external_tables(env),
6975
extras=self.resolve_extra_properties(env),
7076
tables_labels=self.resolve_tables_labels(env),
71-
dataset_labels=self.resolve_dataset_labels(env))
77+
dataset_labels=self.resolve_dataset_labels(env),
78+
job_labels=self.resolve_job_labels(env))
7279

7380
def resolve_extra_properties(self, env: str = None):
7481
return {k: v for (k, v) in self.resolve(env).items() if self._is_extra_property(k)}
@@ -103,5 +110,8 @@ def resolve_tables_labels(self, env: str = None) -> Dict[str, Dict[str, str]]:
103110
def resolve_dataset_labels(self, env: str = None) -> Dict[str, str]:
104111
return self.resolve_property('dataset_labels', env)
105112

113+
def resolve_job_labels(self, env: str = None) -> Dict[str, str]:
114+
return self.resolve_property('job_labels', env)
115+
106116
def _is_extra_property(self, property_name) -> bool:
107-
return property_name not in ['project_id','dataset_name','internal_tables','external_tables', 'env', 'dataset_labels', 'tables_labels']
117+
return property_name not in ['project_id','dataset_name','internal_tables','external_tables', 'env', 'dataset_labels', 'tables_labels', 'job_labels']

bigflow/bigquery/dataset_manager.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -265,12 +265,14 @@ class DatasetManager(object):
265265
def __init__(self,
266266
bigquery_client: 'google.cloud.bigquery.Client',
267267
dataset: 'google.cloud.bigquery.Dataset',
268-
logger: logging.Logger):
268+
logger: logging.Logger,
269+
job_labels: Dict[str, str] | None = None):
269270
from google.cloud import bigquery
270271
self.bigquery_client: bigquery.Client = bigquery_client
271272
self.dataset = dataset
272273
self.dataset_id = dataset.full_dataset_id.replace(':', '.')
273274
self.logger = logger
275+
self.job_labels = job_labels
274276

275277
def write_tmp(self, table_id: str, sql: str) -> 'google.cloud.bigquery.table.RowIterator':
276278
return self.write(table_id, sql, 'WRITE_TRUNCATE')
@@ -285,6 +287,9 @@ def write(self, table_id: str, sql: str, mode: str) -> 'google.cloud.bigquery.ta
285287
job_config.destination = table_id
286288
job_config.write_disposition = mode
287289

290+
if self.job_labels:
291+
job_config.labels = self.job_labels
292+
288293
job = self.bigquery_client.query(sql, job_config=job_config)
289294
return job.result()
290295

@@ -314,7 +319,13 @@ def create_table(self, create_query: str) -> 'google.cloud.bigquery.table.RowIte
314319
return job.result()
315320

316321
def collect(self, sql: str) -> 'pandas.DataFrame':
317-
return self._query(sql).to_dataframe()
322+
from google.cloud import bigquery
323+
job_config = bigquery.QueryJobConfig()
324+
325+
if self.job_labels:
326+
job_config.labels = self.job_labels
327+
328+
return self._query(sql, job_config=job_config).to_dataframe()
318329

319330
def collect_list(
320331
self,
@@ -482,7 +493,8 @@ def create_dataset_manager(
482493
location: str = DEFAULT_LOCATION,
483494
logger: Logger | None = None,
484495
tables_labels: Dict[str, Dict[str, str]] | None = None,
485-
dataset_labels: Dict[str, str] | None = None
496+
dataset_labels: Dict[str, str] | None = None,
497+
job_labels: Dict[str, str] | None = None
486498
) -> tp.Tuple[str, PartitionedDatasetManager]:
487499
"""
488500
Dataset manager factory.
@@ -501,6 +513,7 @@ def create_dataset_manager(
501513
:param logger: custom logger.
502514
:param tables_labels: Dict with key as table_name and value as list of key/valued labels.
503515
:param dataset_labels: Dict with key/valued labels.
516+
:param job_labels: Dict with key/valued labels.
504517
:return: tuple (full dataset ID, dataset manager).
505518
"""
506519
dataset_name = dataset_name or random_uuid(suffix='_test_case')
@@ -516,6 +529,6 @@ def create_dataset_manager(
516529

517530
upsert_tables_labels(dataset_name, tables_labels, client)
518531

519-
core_dataset_manager = DatasetManager(client, dataset, logger)
532+
core_dataset_manager = DatasetManager(client, dataset, logger, job_labels)
520533
templated_dataset_manager = TemplatedDatasetManager(core_dataset_manager, internal_tables, external_tables, extras, runtime)
521534
return dataset.full_dataset_id.replace(':', '.'), PartitionedDatasetManager(templated_dataset_manager, get_partition_from_run_datetime_or_none(runtime))

bigflow/bigquery/interactive.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ def decorator(standard_component):
5252

5353

5454
class InteractiveDatasetManager(Dataset):
55-
"""Let's you run operations on a dataset, without the need of creating a component."""
55+
"""Lets you run operations on a dataset, without the need of creating a component."""
5656

5757
def __init__(self,
5858
project_id: str,
@@ -63,7 +63,8 @@ def __init__(self,
6363
extras: Dict = None,
6464
location: str = DEFAULT_LOCATION,
6565
tables_labels: Dict[str, Dict[str, str]] = None,
66-
dataset_labels: Dict[str, str] = None):
66+
dataset_labels: Dict[str, str] = None,
67+
job_labels: Dict[str, str] = None):
6768
self.config = DatasetConfigInternal(
6869
project_id=project_id,
6970
dataset_name=dataset_name,
@@ -73,7 +74,8 @@ def __init__(self,
7374
extras=extras,
7475
location=location,
7576
tables_labels=tables_labels,
76-
dataset_labels=dataset_labels)
77+
dataset_labels=dataset_labels,
78+
job_labels=job_labels)
7779
logger.debug("Create InteractiveDatasetManager, config %s", self.config._as_dict())
7880

7981
def write_truncate(self, table_name, sql, partitioned=True):
@@ -488,7 +490,8 @@ def __init__(self,
488490
extras=None,
489491
location=DEFAULT_LOCATION,
490492
tables_labels: Dict[str, Dict[str, str]] = None,
491-
dataset_labels: Dict[str, str] = None
493+
dataset_labels: Dict[str, str] = None,
494+
job_labels: Dict[str, str] = None
492495
):
493496
self.project_id = project_id
494497
self.dataset_name = dataset_name
@@ -499,6 +502,7 @@ def __init__(self,
499502
self.location = location
500503
self.tables_labels = tables_labels or {}
501504
self.dataset_labels = dataset_labels or {}
505+
self.job_labels = job_labels or {}
502506

503507
def _as_dict(self):
504508
return {
@@ -510,7 +514,8 @@ def _as_dict(self):
510514
'extras': self.extras,
511515
'location': self.location,
512516
'tables_labels': self.tables_labels,
513-
'dataset_labels': self.dataset_labels
517+
'dataset_labels': self.dataset_labels,
518+
'job_labels': self.job_labels
514519
}
515520

516521

docs/technologies.md

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,7 @@ to call BigQuery SQL.
309309
Fully qualified names of internal tables are resolved to `{project_id}.{dataset_name}.{table_name}`.
310310
* `external_tables` — Dict that defines aliases for external table names.
311311
Fully qualified names of those tables have to be declared explicitly.
312+
* `job_labels` — Dict of labels that will be set on BigQuery jobs.
312313

313314
The distinction between internal and external tables shouldn't be treated too seriously.
314315
Internal means `mine`. External means any other. It's just a naming convention.
@@ -511,7 +512,7 @@ The `table_labels` and `dataset_labels` parameters allow your workflow to create
511512
On the first run, tables are not created yet, so we can not create labels then. Labels are added on second and later run when tables are already created.
512513

513514
```python
514-
from bigflow.bigquery import DatasetConfig
515+
from bigflow.bigquery import DatasetConfig
515516

516517
dataset_config = DatasetConfig(
517518
env='dev',
@@ -526,8 +527,19 @@ dataset_config = DatasetConfig(
526527
}
527528
},
528529
dataset_labels={"dataset_label_1": "value_1", "dataset_label_2": "value_2"}).create_dataset_manager()
530+
```
529531

532+
The `job_labels` argument allows to label BigQuery job. It is passed to [`QueryJobConfig.labels`](https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.QueryJobConfig#google_cloud_bigquery_job_QueryJobConfig_labels)
533+
in `write` and `collect` methods of `DatasetManager`.
530534

531-
```
535+
```python
536+
from bigflow.bigquery import DatasetConfig
532537

533-
You can us it as an ad-hoc tool or put a labeling job to a workflow as well.
538+
dataset_config = DatasetConfig(
539+
env='dev',
540+
project_id='your-project-id',
541+
dataset_name='example_dataset',
542+
internal_tables=['example_table'],
543+
external_tables={},
544+
job_labels={"owner": "John Doe"}).create_dataset_manager()
545+
```

test/test_job.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ def test_component(bigquery_dependency1, bigquery_dependency2):
2828
'location': 'EU',
2929
'tables_labels': {},
3030
'dataset_labels': {},
31+
'job_labels': {},
3132
})
3233

3334
# and
@@ -42,6 +43,7 @@ def test_component(bigquery_dependency1, bigquery_dependency2):
4243
'location': 'EU',
4344
'tables_labels': {},
4445
'dataset_labels': {},
46+
'job_labels': {},
4547
})
4648

4749
job = Job(component=test_component,

0 commit comments

Comments
 (0)