Skip to content

Commit 52e072e

Browse files
Merge pull request #1338 from cloudsufi/cherrypick-label-sink
Cherrypick - [PLUGIN-1705] Sink job label support #1319
2 parents d755525 + 21e802d commit 52e072e

15 files changed

+416
-30
lines changed

docs/BigQueryMultiTable-batchsink.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,14 @@ write BigQuery data to this project.
4242
Datasets are top-level containers that are used to organize and control access to tables and views.
4343
If dataset does not exist, it will be created.
4444

45+
**BQ Job Labels:** Key value pairs to be added as labels to the BigQuery job. Keys must be unique. (Macro Enabled)
46+
47+
[job_source, type] are system defined labels used by CDAP for internal purpose and cannot be used as label keys.
48+
Macro format is supported. example `key1:val1,key2:val2`
49+
50+
Keys and values can contain only lowercase letters, numeric characters, underscores, and dashes.
51+
For more information about labels, see [Docs](https://cloud.google.com/bigquery/docs/labels-intro#requirements).
52+
4553
**Temporary Bucket Name:** Google Cloud Storage bucket to store temporary data in.
4654
It will be automatically created if it does not exist. Temporary data will be deleted after it is loaded into BigQuery.
4755
If the bucket was created automatically, it will be deleted after the run finishes.

docs/BigQueryTable-batchsink.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,14 @@ bucket will be created and then deleted after the run finishes.
5252

5353
**GCS Upload Request Chunk Size**: GCS upload request chunk size in bytes. Default value is 8388608 bytes.
5454

55+
**BQ Job Labels:** Key value pairs to be added as labels to the BigQuery job. Keys must be unique. (Macro Enabled)
56+
57+
[job_source, type] are system defined labels used by CDAP for internal purpose and cannot be used as label keys.
58+
Macro format is supported. example `key1:val1,key2:val2`
59+
60+
Keys and values can contain only lowercase letters, numeric characters, underscores, and dashes.
61+
For more information about labels, see [Docs](https://cloud.google.com/bigquery/docs/labels-intro#requirements).
62+
5563
**JSON String**: List of fields to be written to BigQuery as a JSON string.
5664
The fields must be of type STRING. To target nested fields, use dot notation.
5765
For example, 'name.first' will target the 'first' field in the 'name' record. (Macro Enabled)

src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ public void run(ActionContext context) throws Exception {
125125
}
126126

127127
// Add labels for the BigQuery Execute job.
128-
builder.setLabels(BigQueryUtil.getJobTags(BigQueryUtil.BQ_JOB_TYPE_EXECUTE_TAG));
128+
builder.setLabels(BigQueryUtil.getJobLabels(BigQueryUtil.BQ_JOB_TYPE_EXECUTE_TAG));
129129

130130
QueryJobConfiguration queryConfig = builder.build();
131131

src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySink.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,12 +107,17 @@ public final void prepareRun(BatchSinkContext context) throws Exception {
107107
bucketName = BigQuerySinkUtils.configureBucket(baseConfiguration, bucketName, runUUID.toString());
108108
Bucket bucket = storage.get(bucketName);
109109

110+
// Set user defined job label key value pair
111+
String jobLabelKeyValue = getConfig().getJobLabelKeyValue();
112+
if (jobLabelKeyValue != null) {
113+
baseConfiguration.set(BigQueryConstants.CONFIG_JOB_LABEL_KEY_VALUE, jobLabelKeyValue);
114+
}
115+
110116
if (!context.isPreviewEnabled()) {
111117
BigQuerySinkUtils.createResources(bigQuery, dataset, datasetId,
112118
storage, bucket, bucketName,
113119
config.getLocation(), cmekKeyName);
114120
}
115-
116121
prepareRunInternal(context, bigQuery, bucketName);
117122
}
118123

src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySinkConfig.java

Lines changed: 118 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import io.cdap.plugin.gcp.common.CmekUtils;
3535

3636
import java.util.Collections;
37+
import java.util.HashSet;
3738
import java.util.Map;
3839
import java.util.Set;
3940
import javax.annotation.Nullable;
@@ -49,6 +50,7 @@ public abstract class AbstractBigQuerySinkConfig extends BigQueryBaseConfig {
4950
public static final String NAME_TRUNCATE_TABLE = "truncateTable";
5051
public static final String NAME_LOCATION = "location";
5152
private static final String NAME_GCS_CHUNK_SIZE = "gcsChunkSize";
53+
public static final String NAME_BQ_JOB_LABELS = "jobLabels";
5254
protected static final String NAME_UPDATE_SCHEMA = "allowSchemaRelaxation";
5355
private static final String SCHEME = "gs://";
5456
protected static final String NAME_JSON_STRING_FIELDS = "jsonStringFields";
@@ -85,6 +87,13 @@ public abstract class AbstractBigQuerySinkConfig extends BigQueryBaseConfig {
8587
"This value is ignored if the dataset or temporary bucket already exist.")
8688
protected String location;
8789

90+
@Name(NAME_BQ_JOB_LABELS)
91+
@Macro
92+
@Nullable
93+
@Description("Key value pairs to be added as labels to the BigQuery job. Keys must be unique. [job_source, type] " +
94+
"are reserved keys and cannot be used as label keys.")
95+
protected String jobLabelKeyValue;
96+
8897
@Name(NAME_JSON_STRING_FIELDS)
8998
@Nullable
9099
@Description("Fields in input schema that should be treated as JSON strings. " +
@@ -120,6 +129,10 @@ protected String getTable() {
120129
public String getGcsChunkSize() {
121130
return gcsChunkSize;
122131
}
132+
@Nullable
133+
public String getJobLabelKeyValue() {
134+
return jobLabelKeyValue;
135+
}
123136

124137
@Nullable
125138
public String getJsonStringFields() {
@@ -138,7 +151,6 @@ public JobInfo.WriteDisposition getWriteDisposition() {
138151
public boolean isTruncateTableSet() {
139152
return truncateTable != null && truncateTable;
140153
}
141-
142154
public void validate(FailureCollector collector) {
143155
validate(collector, Collections.emptyMap());
144156
}
@@ -161,6 +173,9 @@ public void validate(FailureCollector collector, Map<String, String> arguments)
161173
if (!containsMacro(NAME_CMEK_KEY)) {
162174
validateCmekKey(collector, arguments);
163175
}
176+
if (!containsMacro(NAME_BQ_JOB_LABELS)) {
177+
validateJobLabelKeyValue(collector);
178+
}
164179
}
165180

166181
void validateCmekKey(FailureCollector failureCollector, Map<String, String> arguments) {
@@ -172,6 +187,108 @@ void validateCmekKey(FailureCollector failureCollector, Map<String, String> argu
172187
validateCmekKeyLocation(cmekKeyName, null, location, failureCollector);
173188
}
174189

190+
/**
191+
* Validates job label key value pairs, as per the following rules:
192+
* Keys and values can contain only lowercase letters, numeric characters, underscores, and dashes.
193+
* Defined in the following link:
194+
* <a href="https://cloud.google.com/bigquery/docs/labels-intro#requirements">Docs</a>
195+
* @param failureCollector failure collector
196+
*/
197+
void validateJobLabelKeyValue(FailureCollector failureCollector) {
198+
Set<String> reservedKeys = BigQueryUtil.BQ_JOB_LABEL_SYSTEM_KEYS;
199+
int maxLabels = 64 - reservedKeys.size();
200+
int maxKeyLength = 63;
201+
int maxValueLength = 63;
202+
203+
String validLabelKeyRegex = "^[\\p{L}][a-z0-9-_\\p{L}]+$";
204+
String validLabelValueRegex = "^[a-z0-9-_\\p{L}]+$";
205+
String capitalLetterRegex = ".*[A-Z].*";
206+
String labelKeyValue = getJobLabelKeyValue();
207+
208+
if (Strings.isNullOrEmpty(labelKeyValue)) {
209+
return;
210+
}
211+
212+
String[] keyValuePairs = labelKeyValue.split(",");
213+
Set<String> uniqueKeys = new HashSet<>();
214+
215+
for (String keyValuePair : keyValuePairs) {
216+
217+
// Adding a label without a value is valid behavior
218+
// Read more here: https://cloud.google.com/bigquery/docs/adding-labels#adding_a_label_without_a_value
219+
String[] keyValue = keyValuePair.trim().split(":");
220+
boolean isKeyPresent = keyValue.length == 1 || keyValue.length == 2;
221+
boolean isValuePresent = keyValue.length == 2;
222+
223+
224+
if (!isKeyPresent) {
225+
failureCollector.addFailure(String.format("Invalid job label key value pair '%s'.", keyValuePair),
226+
"Job label key value pair should be in the format 'key:value'.")
227+
.withConfigProperty(NAME_BQ_JOB_LABELS);
228+
continue;
229+
}
230+
231+
// Check if key is reserved
232+
if (reservedKeys.contains(keyValue[0])) {
233+
failureCollector.addFailure(String.format("Invalid job label key '%s'.", keyValue[0]),
234+
"A system label already exists with same name.").withConfigProperty(NAME_BQ_JOB_LABELS);
235+
continue;
236+
}
237+
238+
String key = keyValue[0];
239+
String value = isValuePresent ? keyValue[1] : "";
240+
boolean isKeyValid = true;
241+
boolean isValueValid = true;
242+
243+
// Key cannot be empty
244+
if (Strings.isNullOrEmpty(key)) {
245+
failureCollector.addFailure(String.format("Invalid job label key '%s'.", key),
246+
"Job label key cannot be empty.").withConfigProperty(NAME_BQ_JOB_LABELS);
247+
isKeyValid = false;
248+
}
249+
250+
// Key cannot be longer than 63 characters
251+
if (key.length() > maxKeyLength) {
252+
failureCollector.addFailure(String.format("Invalid job label key '%s'.", key),
253+
"Job label key cannot be longer than 63 characters.").withConfigProperty(NAME_BQ_JOB_LABELS);
254+
isKeyValid = false;
255+
}
256+
257+
// Value cannot be longer than 63 characters
258+
if (value.length() > maxValueLength) {
259+
failureCollector.addFailure(String.format("Invalid job label value '%s'.", value),
260+
"Job label value cannot be longer than 63 characters.").withConfigProperty(NAME_BQ_JOB_LABELS);
261+
isValueValid = false;
262+
}
263+
264+
if (isKeyValid && (!key.matches(validLabelKeyRegex) || key.matches(capitalLetterRegex))) {
265+
failureCollector.addFailure(String.format("Invalid job label key '%s'.", key),
266+
"Job label key can only contain lowercase letters, numeric characters, " +
267+
"underscores, and dashes. Check docs for more details.")
268+
.withConfigProperty(NAME_BQ_JOB_LABELS);
269+
isKeyValid = false;
270+
}
271+
272+
if (isValuePresent && isValueValid &&
273+
(!value.matches(validLabelValueRegex) || value.matches(capitalLetterRegex))) {
274+
failureCollector.addFailure(String.format("Invalid job label value '%s'.", value),
275+
"Job label value can only contain lowercase letters, numeric characters, " +
276+
"underscores, and dashes.").withConfigProperty(NAME_BQ_JOB_LABELS);
277+
}
278+
279+
if (isKeyValid && !uniqueKeys.add(key)) {
280+
failureCollector.addFailure(String.format("Duplicate job label key '%s'.", key),
281+
"Job label key should be unique.").withConfigProperty(NAME_BQ_JOB_LABELS);
282+
}
283+
}
284+
// Check if number of labels is greater than 64 - reserved keys
285+
if (uniqueKeys.size() > maxLabels) {
286+
failureCollector.addFailure("Number of job labels exceeds the limit.",
287+
String.format("Number of job labels cannot be greater than %d.", maxLabels))
288+
.withConfigProperty(NAME_BQ_JOB_LABELS);
289+
}
290+
}
291+
175292
public String getDatasetProject() {
176293
return connection == null ? null : connection.getDatasetProject();
177294
}

src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryOutputFormat.java

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,7 @@ public void commitJob(JobContext jobContext) throws IOException {
232232
allowSchemaRelaxationOnEmptyOutput =
233233
conf.getBoolean(BigQueryConstants.CONFIG_ALLOW_SCHEMA_RELAXATION_ON_EMPTY_OUTPUT, false);
234234
LOG.debug("Allow schema relaxation: '{}'", allowSchemaRelaxation);
235+
String jobLabelKeyValue = conf.get(BigQueryConstants.CONFIG_JOB_LABEL_KEY_VALUE, null);
235236
PartitionType partitionType = conf.getEnum(BigQueryConstants.CONFIG_PARTITION_TYPE, PartitionType.NONE);
236237
LOG.debug("Create Partitioned Table type: '{}'", partitionType);
237238
com.google.cloud.bigquery.TimePartitioning.Type timePartitioningType = conf.getEnum(
@@ -263,7 +264,7 @@ public void commitJob(JobContext jobContext) throws IOException {
263264
try {
264265
importFromGcs(destProjectId, destTable, destSchema.orElse(null), kmsKeyName, outputFileFormat,
265266
writeDisposition, sourceUris, partitionType, timePartitioningType, range, partitionByField,
266-
requirePartitionFilter, clusteringOrderList, tableExists, conf);
267+
requirePartitionFilter, clusteringOrderList, tableExists, jobLabelKeyValue, conf);
267268
} catch (Exception e) {
268269
throw new IOException("Failed to import GCS into BigQuery. ", e);
269270
}
@@ -309,7 +310,8 @@ private void importFromGcs(String projectId, TableReference tableRef, @Nullable
309310
List<String> gcsPaths, PartitionType partitionType,
310311
com.google.cloud.bigquery.TimePartitioning.Type timePartitioningType,
311312
@Nullable Range range, @Nullable String partitionByField, boolean requirePartitionFilter,
312-
List<String> clusteringOrderList, boolean tableExists, Configuration conf)
313+
List<String> clusteringOrderList, boolean tableExists, String jobLabelKeyValue,
314+
Configuration conf)
313315
throws IOException, InterruptedException {
314316
LOG.info("Importing into table '{}' from {} paths; path[0] is '{}'; awaitCompletion: {}",
315317
BigQueryStrings.toString(tableRef), gcsPaths.size(), gcsPaths.isEmpty() ? "(empty)" : gcsPaths.get(0),
@@ -431,18 +433,18 @@ private void importFromGcs(String projectId, TableReference tableRef, @Nullable
431433

432434
JobConfiguration config = new JobConfiguration();
433435
config.setLoad(loadConfig);
434-
config.setLabels(BigQueryUtil.getJobTags(BigQueryUtil.BQ_JOB_TYPE_SINK_TAG));
436+
config.setLabels(BigQueryUtil.getJobLabels(BigQueryUtil.BQ_JOB_TYPE_SINK_TAG, jobLabelKeyValue));
435437
triggerBigqueryJob(projectId, jobId , dataset, config, tableRef);
436438
} else {
437439
// First load the data in a temp table.
438-
loadInBatchesInTempTable(tableRef, loadConfig, gcsPaths, projectId, jobId, dataset);
440+
loadInBatchesInTempTable(tableRef, loadConfig, gcsPaths, projectId, jobId, dataset, jobLabelKeyValue);
439441

440442
if (operation.equals(Operation.INSERT)) { // For the case when gcs paths is more than 10000
441443
handleInsertOperation(tableRef, writeDisposition, loadConfig.getDestinationEncryptionConfiguration(),
442-
projectId, jobId, dataset, tableExists);
444+
projectId, jobId, dataset, tableExists, jobLabelKeyValue);
443445
} else {
444446
handleUpdateUpsertOperation(tableRef, tableExists, kmsKeyName, getJobIdForUpdateUpsert(conf),
445-
projectId, dataset);
447+
projectId, dataset, jobLabelKeyValue);
446448
}
447449
}
448450

@@ -471,7 +473,8 @@ private void triggerBigqueryJob(String projectId, String jobId, Dataset dataset,
471473
}
472474

473475
private void loadInBatchesInTempTable(TableReference tableRef, JobConfigurationLoad loadConfig,
474-
List<String> gcsPaths, String projectId, String jobId, Dataset dataset)
476+
List<String> gcsPaths, String projectId, String jobId, Dataset dataset,
477+
String jobLabelKeyValue)
475478
throws IOException, InterruptedException {
476479

477480
LOG.info(" Importing into a temporary table first in batches of 10000");
@@ -495,7 +498,7 @@ private void loadInBatchesInTempTable(TableReference tableRef, JobConfigurationL
495498
loadConfig.setSourceUris(gcsPathBatch);
496499
JobConfiguration config = new JobConfiguration();
497500
config.setLoad(loadConfig);
498-
config.setLabels(BigQueryUtil.getJobTags(BigQueryUtil.BQ_JOB_TYPE_SINK_TAG));
501+
config.setLabels(BigQueryUtil.getJobLabels(BigQueryUtil.BQ_JOB_TYPE_SINK_TAG, jobLabelKeyValue));
499502

500503
triggerBigqueryJob(projectId, jobId + "_" + jobcount, dataset, config, tableRef);
501504
jobcount++;
@@ -627,7 +630,8 @@ private static Optional<TableSchema> getTableSchema(Configuration conf) throws I
627630

628631
private void handleInsertOperation(TableReference tableRef, String writeDisposition,
629632
EncryptionConfiguration encryptionConfiguration, String projectId, String jobId,
630-
Dataset dataset, boolean tableExists) throws IOException, InterruptedException {
633+
Dataset dataset, boolean tableExists,
634+
String jobLabelKeyValue) throws IOException, InterruptedException {
631635
if (allowSchemaRelaxation && tableExists) {
632636
updateTableSchema(tableRef);
633637
}
@@ -639,7 +643,7 @@ private void handleInsertOperation(TableReference tableRef, String writeDisposit
639643

640644
JobConfiguration config = new JobConfiguration();
641645
config.setCopy(tableCopyConfig);
642-
config.setLabels(BigQueryUtil.getJobTags(BigQueryUtil.BQ_JOB_TYPE_SINK_TAG));
646+
config.setLabels(BigQueryUtil.getJobLabels(BigQueryUtil.BQ_JOB_TYPE_SINK_TAG, jobLabelKeyValue));
643647
triggerBigqueryJob(projectId, jobId, dataset, config, tableRef);
644648
}
645649

@@ -648,7 +652,8 @@ private void handleUpdateUpsertOperation(TableReference tableRef,
648652
@Nullable String cmekKey,
649653
JobId jobId,
650654
String projectId,
651-
Dataset dataset) throws IOException, InterruptedException {
655+
Dataset dataset,
656+
String jobLabelKeyValue) throws IOException, InterruptedException {
652657
if (allowSchemaRelaxation && tableExists) {
653658
updateTableSchema(tableRef);
654659
}
@@ -677,7 +682,7 @@ private void handleUpdateUpsertOperation(TableReference tableRef,
677682

678683
// Create Job Configuration and add job labels
679684
JobConfiguration jobConfiguration = new JobConfiguration();
680-
jobConfiguration.setLabels(BigQueryUtil.getJobTags(BigQueryUtil.BQ_JOB_TYPE_SINK_TAG));
685+
jobConfiguration.setLabels(BigQueryUtil.getJobLabels(BigQueryUtil.BQ_JOB_TYPE_SINK_TAG, jobLabelKeyValue));
681686
jobConfiguration.setQuery(jobConfigurationQuery);
682687

683688
// Trigger job execution

src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkConfig.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -197,12 +197,13 @@ private BigQuerySinkConfig(@Nullable String referenceName, @Nullable String proj
197197
@Nullable String serviceAccountType, @Nullable String serviceFilePath,
198198
@Nullable String serviceAccountJson,
199199
@Nullable String dataset, @Nullable String table, @Nullable String location,
200-
@Nullable String cmekKey, @Nullable String bucket) {
200+
@Nullable String cmekKey, @Nullable String bucket, @Nullable String jobLabelKeyValue) {
201201
super(new BigQueryConnectorConfig(project, project, serviceAccountType,
202202
serviceFilePath, serviceAccountJson), dataset, cmekKey, bucket);
203203
this.referenceName = referenceName;
204204
this.table = table;
205205
this.location = location;
206+
this.jobLabelKeyValue = jobLabelKeyValue;
206207
}
207208

208209
public String getTable() {
@@ -696,6 +697,7 @@ public static class Builder {
696697
private String cmekKey;
697698
private String location;
698699
private String bucket;
700+
private String jobLabelKeyValue;
699701

700702
public BigQuerySinkConfig.Builder setReferenceName(@Nullable String referenceName) {
701703
this.referenceName = referenceName;
@@ -746,6 +748,10 @@ public BigQuerySinkConfig.Builder setBucket(@Nullable String bucket) {
746748
this.bucket = bucket;
747749
return this;
748750
}
751+
public BigQuerySinkConfig.Builder setJobLabelKeyValue(@Nullable String jobLabelKeyValue) {
752+
this.jobLabelKeyValue = jobLabelKeyValue;
753+
return this;
754+
}
749755

750756
public BigQuerySinkConfig build() {
751757
return new BigQuerySinkConfig(
@@ -758,7 +764,8 @@ public BigQuerySinkConfig build() {
758764
table,
759765
location,
760766
cmekKey,
761-
bucket
767+
bucket,
768+
jobLabelKeyValue
762769
);
763770
}
764771

src/main/java/io/cdap/plugin/gcp/bigquery/source/PartitionedBigQueryInputFormat.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,7 @@ private static void runQuery(Configuration configuration,
257257

258258
JobConfiguration config = new JobConfiguration();
259259
config.setQuery(queryConfig);
260-
config.setLabels(BigQueryUtil.getJobTags(BigQueryUtil.BQ_JOB_TYPE_SOURCE_TAG));
260+
config.setLabels(BigQueryUtil.getJobLabels(BigQueryUtil.BQ_JOB_TYPE_SOURCE_TAG));
261261

262262
JobReference jobReference = getJobReference(configuration, bigQueryHelper, projectId, location);
263263

src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/util/BigQuerySQLEngineUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,7 @@ public static Map<String, String> getJobTags(BigQueryJobType operation) {
357357
* @return Map containing tags for a job.
358358
*/
359359
public static Map<String, String> getJobTags(String operation) {
360-
Map<String, String> labels = BigQueryUtil.getJobTags(BigQueryUtil.BQ_JOB_TYPE_PUSHDOWN_TAG);
360+
Map<String, String> labels = BigQueryUtil.getJobLabels(BigQueryUtil.BQ_JOB_TYPE_PUSHDOWN_TAG);
361361
labels.put("pushdown_operation", operation);
362362
return labels;
363363
}

0 commit comments

Comments
 (0)