diff --git a/README.md b/README.md
index 59136720..6cbbeaa5 100644
--- a/README.md
+++ b/README.md
@@ -22,7 +22,7 @@ Prerequisites:
```
git clone https://github.com/GoogleCloudDataproc/flink-bigquery-connector
-cd flink-connector-bigquery
+cd flink-bigquery-connector
mvn clean package -DskipTests
```
diff --git a/cloudbuild/cloudbuild.yaml b/cloudbuild/cloudbuild.yaml
index 63db5a5e..d62996a8 100644
--- a/cloudbuild/cloudbuild.yaml
+++ b/cloudbuild/cloudbuild.yaml
@@ -13,27 +13,15 @@ steps:
env:
- 'CODECOV_TOKEN=${_CODECOV_TOKEN}'
-# 3. Run unit tests
+# 3. Run unit & integration tests
- name: 'gcr.io/$PROJECT_ID/dataproc-flink-bigquery-connector-presubmit'
id: 'unit-tests'
waitFor: ['init']
entrypoint: 'bash'
- args: ['/workspace/cloudbuild/presubmit.sh', 'unittest']
+ args: ['/workspace/cloudbuild/presubmit.sh', 'tests']
env:
- 'CODECOV_TOKEN=${_CODECOV_TOKEN}'
-# 4. Run integration tests concurrently with unit tests
-# Commeneted out until integration tests are ported
-# - name: 'gcr.io/$PROJECT_ID/dataproc-flink-bigquery-connector-presubmit'
-# id: 'integration-tests'
-# waitFor: ['unit-tests']
-# entrypoint: 'bash'
-# args: ['/workspace/cloudbuild/presubmit.sh', 'integrationtest']
-# env:
-# - 'GOOGLE_CLOUD_PROJECT=${_GOOGLE_CLOUD_PROJECT}'
-# - 'TEMPORARY_GCS_BUCKET=${_TEMPORARY_GCS_BUCKET}'
-# - 'CODECOV_TOKEN=${_CODECOV_TOKEN}'
-
# Tests take around 20 mins in general.
timeout: 1800s
diff --git a/cloudbuild/presubmit.sh b/cloudbuild/presubmit.sh
index 8ed18096..71e7162d 100644
--- a/cloudbuild/presubmit.sh
+++ b/cloudbuild/presubmit.sh
@@ -30,18 +30,13 @@ cd /workspace
case $STEP in
# Download maven and all the dependencies
init)
- $MVN install -DskipTests
+ $MVN clean install -DskipTests
exit
;;
- # Run unit tests
- unittest)
- $MVN test jacoco:report jacoco:report-aggregate
- ;;
-
- # Run integration tests
- integrationtest)
- $MVN failsafe:integration-test failsafe:verify jacoco:report jacoco:report-aggregate
+ # Run unit & integration tests
+ tests)
+ $MVN clean clover:setup verify clover:aggregate clover:clover -Pclover -pl flink-connector-bigquery
;;
*)
diff --git a/flink-connector-bigquery-examples/pom.xml b/flink-connector-bigquery-examples/pom.xml
new file mode 100644
index 00000000..cc183d30
--- /dev/null
+++ b/flink-connector-bigquery-examples/pom.xml
@@ -0,0 +1,131 @@
+
+
+
+ 4.0.0
+
+
+ org.apache.flink
+ flink-connector-bigquery-parent
+ 1.1-SNAPSHOT
+
+
+ flink-connector-bigquery-examples
+ Flink : Connectors : Google BigQuery Examples
+ jar
+
+
+ true
+
+
+
+
+ org.apache.flink
+ flink-connector-bigquery
+ ${project.version}
+
+
+ org.apache.flink
+ flink-streaming-java
+ ${flink.version}
+
+
+
+
+ BigQueryExample
+
+
+ org.apache.maven.plugins
+ maven-deploy-plugin
+
+ true
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+
+
+ shade-flink
+ package
+
+ shade
+
+
+ false
+ false
+
+
+ src/test/resources
+
+
+
+
+ org.apache.flink.examples.gcp.bigquery.BigQueryExample
+
+
+ log4j2-bqexample.properties
+ src/main/resources/log4j2-bqexample.properties
+
+
+
+
+ org.apache.flink:flink-connector-bigquery
+ org.apache.flink:flink-avro
+ org.apache.flink:flink-metrics-dropwizard
+ com.google.cloud:google-cloud-bigquerystorage
+ com.google.*:*
+ commons-codec:commons-codec
+ dev.failsafe:*
+ org.apache.avro:*
+ org.apache.httpcomponents:*
+ org.codehaus.mojo:animal-sniffer-annotations
+ org.conscrypt:*
+ com.fasterxml.jackson.*:*
+ org.threeten:*
+ org.checkerframework:*
+ io.dropwizard.metrics:*
+ io.grpc:*
+ io.opencensus:*
+ io.perfmark:*
+
+
+
+
+ com.google
+ org.apache.flink.examples.gcp.bigquery.shaded.com.google
+
+
+
+
+
+ org.apache.flink:flink-connector-bigquery-examples*
+
+ org/apache/flink/examples/gcp/bigquery/**
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/flink-connector-bigquery-examples/src/main/java/org/apache/flink/examples/gcp/bigquery/BigQueryExample.java b/flink-connector-bigquery-examples/src/main/java/org/apache/flink/examples/gcp/bigquery/BigQueryExample.java
new file mode 100644
index 00000000..df6b6f4e
--- /dev/null
+++ b/flink-connector-bigquery-examples/src/main/java/org/apache/flink/examples/gcp/bigquery/BigQueryExample.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.gcp.bigquery;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.Collector;
+
+import com.google.cloud.flink.bigquery.common.config.BigQueryConnectOptions;
+import com.google.cloud.flink.bigquery.source.BigQuerySource;
+import com.google.cloud.flink.bigquery.source.config.BigQueryReadOptions;
+import org.apache.avro.generic.GenericRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A simple Flink application using DataStream API and BigQuery connector.
+ *
+ *
The Flink pipeline will try to read the specified BigQuery table, limiting the element count
+ * to the specified row restriction and limit, returning {@link GenericRecord} representing the
+ * rows, and finally print out some aggregated values given the provided payload's field. The
+ * sequence of operations in this pipeline is: source > flatMap > keyBy > max > print.
+ *
+ *
Flink command line format to execute this application:
+ * flink run {additional runtime params} {path to this jar}/BigQueryExample.jar
+ * --gcp-project {required; project ID which contains the BigQuery table}
+ * --bq-dataset {required; name of BigQuery dataset containing the desired table}
+ * --bq-table {required; name of BigQuery table to read}
+ * --agg-prop {required; record property to aggregate in Flink job}
+ * --restriction {optional; SQL-like filter applied at the BigQuery table before reading}
+ * --limit {optional; maximum records to read from BigQuery table}
+ * --checkpoint-interval {optional; time interval between state checkpoints in milliseconds}
+ *
+ *
Note on row restriction: In case a restriction relies on temporal reference, something like
+ * {@code "TIMESTAMP_TRUNC(ingestion_timestamp, HOUR) = '2023-06-20 19:00:00'"}, and if launching
+ * the job from Flink's Rest API, a known issue is that single quotes are not supported and will
+ * cause the pipeline to fail. As a workaround, using \u0027 instead of the quotes will work. For
+ * example {@code "TIMESTAMP_TRUNC(ingestion_timestamp, HOUR) = \u00272023-06-20 19:00:00\u0027"}.
+ */
+public class BigQueryExample {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BigQueryExample.class);
+
+ public static void main(String[] args) throws Exception {
+ // parse input arguments
+ final ParameterTool parameterTool = ParameterTool.fromArgs(args);
+
+ if (parameterTool.getNumberOfParameters() < 4) {
+ LOG.error(
+ "Missing parameters!\n"
+ + "Usage: flink run BigQuery.jar"
+ + " --gcp-project "
+ + " --bq-dataset "
+ + " --bq-table "
+ + " --agg-prop "
+ + " --restriction "
+ + " --limit "
+ + " --checkpoint-interval ");
+ return;
+ }
+
+ String projectName = parameterTool.getRequired("gcp-project");
+ String datasetName = parameterTool.getRequired("bq-dataset");
+ String tableName = parameterTool.getRequired("bq-table");
+ String rowRestriction = parameterTool.get("restriction", "").replace("\\u0027", "'");
+ Integer recordLimit = parameterTool.getInt("limit", -1);
+ String recordPropertyToAggregate = parameterTool.getRequired("agg-prop");
+ Long checkpointInterval = parameterTool.getLong("checkpoint-interval", 60000L);
+
+ runFlinkJob(
+ projectName,
+ datasetName,
+ tableName,
+ recordPropertyToAggregate,
+ rowRestriction,
+ recordLimit,
+ checkpointInterval);
+ }
+
+ private static void runFlinkJob(
+ String projectName,
+ String datasetName,
+ String tableName,
+ String recordPropertyToAggregate,
+ String rowRestriction,
+ Integer limit,
+ Long checkpointInterval)
+ throws Exception {
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(checkpointInterval);
+
+ /**
+ * we will be reading avro generic records from BigQuery, and in this case we are assuming
+ * the GOOGLE_APPLICATION_CREDENTIALS env variable will be present in the execution runtime.
+ * In case of needing authenticate differently, the credentials builder (part of the
+ * BigQueryConnectOptions) should enable capturing the credentials from various sources.
+ */
+ BigQuerySource bqSource =
+ BigQuerySource.readAvros(
+ BigQueryReadOptions.builder()
+ .setBigQueryConnectOptions(
+ BigQueryConnectOptions.builder()
+ .setProjectId(projectName)
+ .setDataset(datasetName)
+ .setTable(tableName)
+ .build())
+ .setRowRestriction(rowRestriction)
+ .build(),
+ limit);
+
+ env.fromSource(bqSource, WatermarkStrategy.noWatermarks(), "BigQuerySource")
+ .flatMap(new FlatMapper(recordPropertyToAggregate))
+ .keyBy(mappedTuple -> mappedTuple.f0)
+ .max("f1")
+ .print();
+
+ env.execute("Flink BigQuery Example");
+ }
+
+ static class FlatMapper implements FlatMapFunction> {
+
+ private final String recordPropertyToAggregate;
+
+ public FlatMapper(String recordPropertyToAggregate) {
+ this.recordPropertyToAggregate = recordPropertyToAggregate;
+ }
+
+ @Override
+ public void flatMap(GenericRecord record, Collector> out)
+ throws Exception {
+ out.collect(
+ Tuple2.of(
+ (String) record.get(recordPropertyToAggregate).toString(), 1));
+ }
+ }
+}
diff --git a/flink-connector-bigquery/pom.xml b/flink-connector-bigquery/pom.xml
index c75d8643..cfcd2d82 100644
--- a/flink-connector-bigquery/pom.xml
+++ b/flink-connector-bigquery/pom.xml
@@ -162,58 +162,10 @@ under the License.
org.apache.maven.plugins
maven-jar-plugin
-
-
-
- test-jar
-
-
-
org.apache.maven.plugins
maven-surefire-plugin
-
-
- default-test
- test
-
- test
-
-
- ${argLine} -XX:+UseG1GC -Xms256m -Xmx1024m
-
-
-
- integration-tests
- integration-test
-
- test
-
-
- -XX:+UseG1GC -Xms256m -Xmx2048m
-
-
-
-
-
- org.jacoco
- jacoco-maven-plugin
-
-
- prepare-agent
-
- prepare-agent
-
-
-
- report
- install
-
- report
-
-
-
diff --git a/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/services/BigQueryServicesImpl.java b/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/services/BigQueryServicesImpl.java
index 2f542d34..4b33da2f 100644
--- a/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/services/BigQueryServicesImpl.java
+++ b/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/services/BigQueryServicesImpl.java
@@ -205,7 +205,8 @@ public List retrieveTablePartitions(String project, String dataset, Stri
} catch (Exception ex) {
throw new RuntimeException(
String.format(
- "Problems while trying to retrieve table partitions (table: %s.%s.%s).",
+ "Problems while trying to retrieve table partitions"
+ + " (table: %s.%s.%s).",
project, dataset, table),
ex);
}
@@ -252,7 +253,8 @@ public Optional> retrievePartitionColumnName
} catch (Exception ex) {
throw new RuntimeException(
String.format(
- "Problems while trying to retrieve table partition's column name (table: %s.%s.%s).",
+ "Problems while trying to retrieve table partition's"
+ + " column name (table: %s.%s.%s).",
project, dataset, table),
ex);
}
@@ -260,10 +262,15 @@ public Optional> retrievePartitionColumnName
@Override
public TableSchema getTableSchema(String project, String dataset, String table) {
- return SchemaTransform.bigQuerySchemaToTableSchema(
- bigQuery.getTable(TableId.of(project, dataset, table))
- .getDefinition()
- .getSchema());
+ return Optional.ofNullable(bigQuery.getTable(TableId.of(project, dataset, table)))
+ .map(t -> t.getDefinition().getSchema())
+ .map(schema -> SchemaTransform.bigQuerySchemaToTableSchema(schema))
+ .orElseThrow(
+ () ->
+ new IllegalArgumentException(
+ String.format(
+ "The provided table %s.%s.%s does not exists.",
+ project, dataset, table)));
}
@Override
diff --git a/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/fakes/StorageClientFaker.java b/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/fakes/StorageClientFaker.java
index f06ac18a..af722ced 100644
--- a/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/fakes/StorageClientFaker.java
+++ b/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/fakes/StorageClientFaker.java
@@ -51,6 +51,8 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.Serializable;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -82,16 +84,18 @@ public StorageReadClient getStorageClient(CredentialsOptions readOptions)
@Override
public QueryDataClient getQueryDataClient(CredentialsOptions readOptions) {
return new QueryDataClient() {
+
@Override
public List retrieveTablePartitions(
String project, String dataset, String table) {
- return Arrays.asList("2023062811");
+ return Arrays.asList(
+ LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHH")));
}
@Override
public Optional> retrievePartitionColumnName(
String project, String dataset, String table) {
- return Optional.of(Tuple2.of("number", StandardSQLTypeName.INT64));
+ return Optional.of(Tuple2.of("ts", StandardSQLTypeName.TIMESTAMP));
}
@Override
@@ -191,8 +195,9 @@ public void close() {}
public static final String SIMPLE_AVRO_SCHEMA_FIELDS_STRING =
" \"fields\": [\n"
- + " {\"name\": \"name\", \"type\": \"string\"},\n"
- + " {\"name\": \"number\", \"type\": \"long\"}\n"
+ + " {\"name\": \"name\", \"type\": \"string\"},\n"
+ + " {\"name\": \"number\", \"type\": \"long\"},\n"
+ + " {\"name\" : \"ts\", \"type\" : {\"type\" : \"long\",\"logicalType\" : \"timestamp-micros\"}}\n"
+ " ]\n";
public static final String SIMPLE_AVRO_SCHEMA_STRING =
"{\"namespace\": \"project.dataset\",\n"
@@ -225,6 +230,10 @@ public void close() {}
new TableFieldSchema()
.setName("number")
.setType("INTEGER")
+ .setMode("REQUIRED"),
+ new TableFieldSchema()
+ .setName("ts")
+ .setType("TIMESTAMP")
.setMode("REQUIRED")));
/** Represents the parameters needed for the Avro data generation. */
diff --git a/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/source/split/BigQuerySourceSplitStateTest.java b/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/source/split/BigQuerySourceSplitStateTest.java
index e71a4b82..74ca783b 100644
--- a/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/source/split/BigQuerySourceSplitStateTest.java
+++ b/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/source/split/BigQuerySourceSplitStateTest.java
@@ -32,6 +32,8 @@ public void testSplitStateTransformation() {
BigQuerySourceSplitState splitState = new BigQuerySourceSplitState(originalSplit);
assertThat(splitState.toBigQuerySourceSplit()).isEqualTo(originalSplit);
+ assertThat(splitState)
+ .isEqualTo(new BigQuerySourceSplitState(splitState.toBigQuerySourceSplit()));
}
@Test
diff --git a/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/table/BigQueryDynamicTableSourceITCase.java b/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/table/BigQueryDynamicTableSourceITCase.java
index 69c1ad38..f3832f39 100644
--- a/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/table/BigQueryDynamicTableSourceITCase.java
+++ b/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/table/BigQueryDynamicTableSourceITCase.java
@@ -211,7 +211,12 @@ public void testProject() {
@Test
public void testRestriction() {
- String sqlFilter = "id = 0 AND NOT optString IS NULL";
+ String sqlFilter =
+ "id = 0"
+ + " AND NOT optString IS NULL"
+ + " AND optString LIKE 's%'"
+ + " AND optDouble > -1"
+ + " AND optDouble <= 1.0 ";
tEnv.executeSql(createTestDDl(null));
Iterator collected =
diff --git a/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/table/restrictions/BigQueryPartitionTest.java b/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/table/restrictions/BigQueryPartitionTest.java
new file mode 100644
index 00000000..23697c02
--- /dev/null
+++ b/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/table/restrictions/BigQueryPartitionTest.java
@@ -0,0 +1,176 @@
+/*
+ * Copyright (C) 2023 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.flink.bigquery.table.restrictions;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
+
+import com.google.cloud.bigquery.StandardSQLTypeName;
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+import java.util.List;
+
+/** */
+public class BigQueryPartitionTest {
+
+ @Test
+ public void testPartitionHour() {
+ List partitionIds = Lists.newArrayList("2023062822", "2023062823");
+ // ISO formatted dates as single quote string literals at the beginning of the hour.
+ List expectedValues =
+ Lists.newArrayList("'2023-06-28 22:00:00'", "'2023-06-28 23:00:00'");
+ List values =
+ BigQueryPartition.partitionValuesFromIdAndDataType(
+ partitionIds, StandardSQLTypeName.TIMESTAMP);
+
+ Assertions.assertThat(values).isEqualTo(expectedValues);
+ }
+
+ @Test
+ public void testPartitionDay() {
+ List partitionIds = Lists.newArrayList("20230628", "20230628");
+ // ISO formatted dates as single quote string literals.
+ List expectedValues = Lists.newArrayList("'2023-06-28'", "'2023-06-28'");
+ List values =
+ BigQueryPartition.partitionValuesFromIdAndDataType(
+ partitionIds, StandardSQLTypeName.DATETIME);
+
+ Assertions.assertThat(values).isEqualTo(expectedValues);
+ }
+
+ @Test
+ public void testPartitionMonth() {
+ List partitionIds = Lists.newArrayList("202306", "202307");
+ // ISO formatted dates as single quote string literals
+ List expectedValues = Lists.newArrayList("'2023-06'", "'2023-07'");
+ List values =
+ BigQueryPartition.partitionValuesFromIdAndDataType(
+ partitionIds, StandardSQLTypeName.DATE);
+
+ Assertions.assertThat(values).isEqualTo(expectedValues);
+ }
+
+ @Test
+ public void testPartitionYear() {
+ List partitionIds = Lists.newArrayList("2023", "2022");
+ // ISO formatted dates as single quote string literals
+ List expectedValues = Lists.newArrayList("'2023'", "'2022'");
+ List values =
+ BigQueryPartition.partitionValuesFromIdAndDataType(
+ partitionIds, StandardSQLTypeName.TIMESTAMP);
+
+ Assertions.assertThat(values).isEqualTo(expectedValues);
+ }
+
+ @Test
+ public void testPartitionInteger() {
+ List partitionIds = Lists.newArrayList("2023", "2022");
+ // ISO formatted dates as single quote string literals
+ List expectedValues = Lists.newArrayList("2023", "2022");
+ List values =
+ BigQueryPartition.partitionValuesFromIdAndDataType(
+ partitionIds, StandardSQLTypeName.INT64);
+
+ Assertions.assertThat(values).isEqualTo(expectedValues);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testWrongTemporalPartition() {
+ List partitionIds = Lists.newArrayList("202308101112");
+ BigQueryPartition.partitionValuesFromIdAndDataType(
+ partitionIds, StandardSQLTypeName.TIMESTAMP);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testWrongArrayPartition() {
+ List partitionIds = Lists.newArrayList("2023", "2022");
+ BigQueryPartition.partitionValuesFromIdAndDataType(partitionIds, StandardSQLTypeName.ARRAY);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testWrongStructPartition() {
+ List partitionIds = Lists.newArrayList("2023", "2022");
+ BigQueryPartition.partitionValuesFromIdAndDataType(
+ partitionIds, StandardSQLTypeName.STRUCT);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testWrongJsonPartition() {
+ List partitionIds = Lists.newArrayList("2023", "2022");
+ BigQueryPartition.partitionValuesFromIdAndDataType(partitionIds, StandardSQLTypeName.JSON);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testWrongGeoPartition() {
+ List partitionIds = Lists.newArrayList("2023", "2022");
+ BigQueryPartition.partitionValuesFromIdAndDataType(
+ partitionIds, StandardSQLTypeName.GEOGRAPHY);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testWrongBigNumPartition() {
+ List partitionIds = Lists.newArrayList("2023", "2022");
+ BigQueryPartition.partitionValuesFromIdAndDataType(
+ partitionIds, StandardSQLTypeName.BIGNUMERIC);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testWrongBoolPartition() {
+ List partitionIds = Lists.newArrayList("2023", "2022");
+ BigQueryPartition.partitionValuesFromIdAndDataType(partitionIds, StandardSQLTypeName.BOOL);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testWrongBytesPartition() {
+ List partitionIds = Lists.newArrayList("2023", "2022");
+ BigQueryPartition.partitionValuesFromIdAndDataType(partitionIds, StandardSQLTypeName.BYTES);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testWrongFloatPartition() {
+ List partitionIds = Lists.newArrayList("2023", "2022");
+ BigQueryPartition.partitionValuesFromIdAndDataType(
+ partitionIds, StandardSQLTypeName.FLOAT64);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testWrongStringPartition() {
+ List partitionIds = Lists.newArrayList("2023", "2022");
+ BigQueryPartition.partitionValuesFromIdAndDataType(
+ partitionIds, StandardSQLTypeName.STRING);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testWrongTimePartition() {
+ List partitionIds = Lists.newArrayList("2023", "2022");
+ BigQueryPartition.partitionValuesFromIdAndDataType(partitionIds, StandardSQLTypeName.TIME);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testWrongIntervalPartition() {
+ List partitionIds = Lists.newArrayList("2023", "2022");
+ BigQueryPartition.partitionValuesFromIdAndDataType(
+ partitionIds, StandardSQLTypeName.INTERVAL);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testWrongNumeriPartition() {
+ List partitionIds = Lists.newArrayList("2023", "2022");
+ BigQueryPartition.partitionValuesFromIdAndDataType(
+ partitionIds, StandardSQLTypeName.NUMERIC);
+ }
+}
diff --git a/flink-sql-connector-bigquery/pom.xml b/flink-sql-connector-bigquery/pom.xml
new file mode 100644
index 00000000..48f8af4f
--- /dev/null
+++ b/flink-sql-connector-bigquery/pom.xml
@@ -0,0 +1,80 @@
+
+
+
+
+ 4.0.0
+
+
+ flink-connector-bigquery-parent
+ org.apache.flink
+ 1.1-SNAPSHOT
+
+
+ flink-sql-connector-bigquery
+ Flink : Connectors : SQL : Google BigQuery
+
+ jar
+
+
+ true
+
+
+
+
+ org.apache.flink
+ flink-connector-bigquery
+ ${project.version}
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+
+
+ shade-flink
+ package
+
+ shade
+
+
+ false
+ false
+
+
+ *:*
+
+
+
+
+ com.google
+ org.apache.flink.connector.bigquery.shaded.com.google
+
+
+
+
+
+
+
+
+
+
diff --git a/flink-sql-connector-bigquery/src/main/resources/META-INF/NOTICE b/flink-sql-connector-bigquery/src/main/resources/META-INF/NOTICE
new file mode 100644
index 00000000..27b8ca73
--- /dev/null
+++ b/flink-sql-connector-bigquery/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,5 @@
+flink-sql-connector-bigquery
+Copyright 2023 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
diff --git a/pom.xml b/pom.xml
index e6018eca..01840830 100644
--- a/pom.xml
+++ b/pom.xml
@@ -56,16 +56,19 @@ under the License.
false
3.0.0-1.16
- 0.8.10
+ 4.4.1
1.7.36
2.17.2
flink-connector-bigquery-parent
+ target/test-report
flink-connector-bigquery
+ flink-sql-connector-bigquery
+ flink-connector-bigquery-examples
@@ -400,11 +403,47 @@ under the License.
org.commonjava.maven.plugins
directory-maven-plugin
-
- org.jacoco
- jacoco-maven-plugin
- ${jacoco.version}
-
+
+
+ clover
+
+
+
+ org.openclover
+ clover-maven-plugin
+ ${clover.version}
+
+
+ ${maven.multiModuleProjectDirectory}/tools/maven/clover.xml
+ 55%
+
+ **/com/google/cloud/flink/bigquery/common/config/*
+ **/com/google/cloud/flink/bigquery/common/utils/GoogleCredentialsSupplier.*
+ **/com/google/cloud/flink/bigquery/services/**Impl.*
+ **/com/google/cloud/flink/bigquery/source/config/*
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+
+ ${surefire.and.failsafe.report.dir}
+
+
+
+
+
+
+
+ org.openclover
+ clover-maven-plugin
+ ${clover.version}
+
+
+
+
+
diff --git a/tools/maven/clover.xml b/tools/maven/clover.xml
new file mode 100644
index 00000000..8149f176
--- /dev/null
+++ b/tools/maven/clover.xml
@@ -0,0 +1,45 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file