diff --git a/CHANGES.md b/CHANGES.md index cee473b4..82492383 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -2,6 +2,10 @@ ## Next +* Support creation of new table in BigQuery sink. This is integrated with Datastream and Table/SQL API. +* Remove need for BigQuerySchemaProvider in BigQuery sink configs. +* Deprecate unbounded source. To be completely removed in next release. + ## 0.4.0 - 2024-11-04 * Support exactly-once consistency in BigQuery sink. This is integrated with Datastream and Table/SQL API. diff --git a/README.md b/README.md index 7215f7a6..8291b192 100644 --- a/README.md +++ b/README.md @@ -99,6 +99,7 @@ repository. | Flink 1.17.x | `com.google.cloud.flink:flink-1.17-connector-bigquery:0.2.0` | At-least Once Sink Support | | Flink 1.17.x | `com.google.cloud.flink:flink-1.17-connector-bigquery:0.3.0` | Table API Support | | Flink 1.17.x | `com.google.cloud.flink:flink-1.17-connector-bigquery:0.4.0` | Exactly Once Sink Support | +| Flink 1.17.x | `com.google.cloud.flink:flink-1.17-connector-bigquery:0.5.0` | Table Creation by Sink | #### GitHub @@ -109,7 +110,7 @@ Users can obtain the connector artifact from our [GitHub repository](https://git ```shell git clone https://github.com/GoogleCloudDataproc/flink-bigquery-connector cd flink-bigquery-connector -git checkout tags/0.4.0 +git checkout tags/0.5.0 mvn clean install -DskipTests -Pflink_1.17 ``` @@ -122,6 +123,16 @@ Maven artifacts are installed under `.m2/repository`. If only the jars are needed, then execute maven `package` instead of `install`. +#### Compilation Dependency (Maven) + +```xml + + com.google.cloud.flink + flink-1.17-connector-bigquery + 0.5.0 + +``` + ### Connector to Flink Compatibility | Connector tag \ Flink version | 1.15.x | 1.17.x | @@ -131,6 +142,7 @@ If only the jars are needed, then execute maven `package` instead of `install`. | 0.2.0 | ✓ | ✓ | | 0.3.0 | ✓ | ✓ | | 0.4.0 | ✓ | ✓ | +| 0.5.0 | ✓ | ✓ | ### Create a Google Cloud Dataproc cluster (Optional) @@ -154,37 +166,35 @@ Follow [this document](https://cloud.google.com/dataproc/docs/concepts/component | 0.2.0 | ✓ | ✓ | | 0.3.0 | ✓ | ✓ | | 0.4.0 | ✓ | ✓ | +| 0.5.0 | ✓ | ✓ | -## Usage -The connector can be used with Flink's Datastream and Table APIs in Java applications. -The source offers two read modes, bounded and unbounded. -The sink offers at-least-once delivery guarantee. -### Compiling against the connector - -#### Maven Dependency - -```xml - - com.google.cloud.flink - flink-1.17-connector-bigquery - 0.4.0 - -``` +## Table API -#### Relevant Files +* Table API is a high-level declarative API that allows users to describe what they want to do rather than how to do it. +* This results in simpler customer code and higher level pipelines that are more easily optimized in a managed service. +* The Table API is a superset of the SQL language and is specially designed for working with Apache Flink. +* It also allows language-embedded style support for queries in Java, Scala or Python besides the always available String values as queries in SQL. -* Sink can be created using `get` method at `com.google.cloud.flink.bigquery.sink.BigQuerySink`. -* Sink configuration is defined at `com.google.cloud.flink.bigquery.sink.BigQuerySinkConfig`. -* Source factory methods are defined at `com.google.cloud.flink.bigquery.source.BigQuerySource`. -* Source configuration is defined at `com.google.cloud.flink.bigquery.source.config.BigQueryReadOptions`. -* BigQuery connection configuration is defined at `com.google.cloud.flink.bigquery.common.config.BigQueryConnectOptions`. -* Sample Flink application using connector is defined at `com.google.cloud.flink.bigquery.examples.BigQueryExample` for the Datastream API, - and at `com.google.cloud.flink.bigquery.examples.BigQueryTableExample` for the Table API and SQL. +### Catalog Tables +* Catalog Table usage helps hide the complexities of interacting with different external systems behind a common interface. +* In Apache Flink, a CatalogTable represents the unresolved metadata of a table stored within a catalog. +* It is an encapsulation of all the characteristics that would typically define an SQL CREATE TABLE statement. +* This includes the table's schema (column names and data types), partitioning information, constraints etc. + It doesn't contain the actual table data. +* SQL Command for Catalog Table Creation + ```java + CREATE TABLE sample_catalog_table + (name STRING) // Schema Details + WITH + ('connector' = 'bigquery', + 'project' = '', + 'dataset' = '', + 'table' = ''); + ``` -### Datastream API -#### Sink +## Sink Flink [Sink](https://nightlies.apache.org/flink/flink-docs-release-1.17/api/java/org/apache/flink/api/connector/sink2/Sink.html) is the base interface for developing a sink. With checkpointing enabled, it can offer at-least-once or exactly-once @@ -192,113 +202,165 @@ consistency. It uses BigQuery Storage's [default write stream](https://cloud.goo for at-least-once, and [buffered write stream](https://cloud.google.com/bigquery/docs/write-api#buffered_type) for exactly-once. +### Sink In Datastream API + ```java StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(checkpointInterval); -// Via DataStream API - BigQueryConnectOptions sinkConnectOptions = BigQueryConnectOptions.builder() .setProjectId(...) // REQUIRED .setDataset(...) // REQUIRED .setTable(...) // REQUIRED .build(); -BigQuerySchemaProvider schemaProvider = new BigQuerySchemaProviderImpl(sinkConnectOptions); DeliveryGuarantee deliveryGuarantee = DeliveryGuarantee.AT_LEAST_ONCE; // or EXACTLY_ONCE BigQuerySinkConfig sinkConfig = BigQuerySinkConfig.newBuilder() .connectOptions(sinkConnectOptions) // REQUIRED .streamExecutionEnvironment(env) // REQUIRED .deliveryGuarantee(deliveryGuarantee) // REQUIRED - .schemaProvider(schemaProvider) // REQUIRED .serializer(new AvroToProtoSerializer()) // REQUIRED + .enableTableCreation(...) // OPTIONAL + .partitionField(...) // OPTIONAL + .partitionType(...) // OPTIONAL + .partitionExpirationMillis(...) // OPTIONAL + .clusteredFields(...) // OPTIONAL + .region(...) // OPTIONAL .build(); -Sink sink = BigQuerySink.get(sinkConfig, env); +Sink sink = BigQuerySink.get(sinkConfig); ``` +### Sink In Table API + +```java +// final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +// env.enableCheckpointing(CHECKPOINT_INTERVAL); +// final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + +// Create the Config. +BigQuerySinkTableConfig sinkTableConfig = BigQuerySinkTableConfig.newBuilder() + .table(...) // REQUIRED + .project(...) // REQUIRED + .dataset(...) // REQUIRED + .streamExecutionEnvironment(env) // REQUIRED if deliveryGuarantee is EXACTLY_ONCE + .sinkParallelism(...) // OPTIONAL; Should be atmost 128 + .deliveryGuarantee(...) // OPTIONAL; Default is AT_LEAST_ONCE + .enableTableCreation(...) // OPTIONAL + .partitionField(...) // OPTIONAL + .partitionType(...) // OPTIONAL + .partitionExpirationMillis(...) // OPTIONAL + .clusteredFields(...) // OPTIONAL + .region(...) // OPTIONAL + .build(); + +// Register the Sink Table +// If destination table already exists, then use: +tEnv.createTable( + "bigQuerySinkTable", + BigQueryTableSchemaProvider.getTableDescriptor(sinkTableConfig)); + +// Else, define the table schema (ensure this matches the schema of records sent to sink) +org.apache.flink.table.api.Schema tableSchema = ... +// ... and use: +tEnv.createTable( + "bigQuerySinkTable", + BigQueryTableSchemaProvider.getTableDescriptor(sinkTableConfig, tableSchema)); + +// Insert entries in this sinkTable +sourceTable.executeInsert("bigQuerySinkTable"); +``` +Note: For jobs running on a dataproc cluster, via "gcloud dataproc submit", explicitly call `await()` after `executeInsert` to +wait for the job to complete. + +### Sink Configurations + +The connector supports a number of options to configure the source. + +| Property | Data Type | Description | +|----------------------------------------------|------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `projectId` | String | Google Cloud Project ID of the table. This config is required. | +| `dataset` | String | Dataset containing the table. This config is required. | +| `table` | String | BigQuery table name (not the full ID). This config is required. | +| `credentialsOptions` | CredentialsOptions | Google credentials for connecting to BigQuery. This config is optional, and default behavior is to use the `GOOGLE_APPLICATION_CREDENTIALS` environment variable. | +| `deliveryGuarantee` | DeliveryGuarantee | Write consistency guarantee of the sink. This config is required. | +| `enableTableCreation` | Boolean | Allows the sink to create the destination BigQuery table (mentioned above) if it doesn't already exist. This config is optional. | +| `partitionField` | String | Column to partition new sink table. This config is optional, and considered if enableTableCreation is true. | +| `partitionType` | TimePartitioning.Type | Column to partition new sink table. This config is optional, and considered if enableTableCreation is true. | +| `partitionExpirationMillis` | Long | Expiration time of partitions in new sink table. This config is optional, and considered if enableTableCreation is true. | +| `clusteredFields` | List<String> | Columns used for clustering new sink table. This config is optional, and considered if enableTableCreation is true. | +| `region` | String | BigQuery region to create the dataset (mentioned above) if it doesn't already exist. This config is optional, and considered if enableTableCreation is true. | +| `sinkParallelism` | Integer | Sink's parallelism. This config is optional, and available only when sink is used with Table API. | + +Knowing that this sink offers limited configurability when creating destination BigQuery table, we'd like to highlight that almost all +configurations or modifications to a BigQuery table are possible after table creation. For example, adding primary keys requires +[this simple SQL query](https://cloud.google.com/bigquery/docs/reference/standard-sql/data-definition-language#alter_table_add_primary_key_statement). +Check out BigQuery SQL's [DDL](https://cloud.google.com/bigquery/docs/reference/standard-sql/data-definition-language), +[DML](https://cloud.google.com/bigquery/docs/reference/standard-sql/dml-syntax) and +[DCL](https://cloud.google.com/bigquery/docs/reference/standard-sql/data-control-language) syntax for details. + +### Sink Details [MUST READ] + * BigQuery sinks require that checkpoint is enabled. +* When using a BigQuery sink, checkpoint timeout should be liberal (1+ minutes). This is because sink writers are + [throttled](https://github.com/GoogleCloudDataproc/flink-bigquery-connector/blob/05fe7b14f2dc688bc808f553c3f863ba2380e317/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/throttle/BigQueryWriterThrottler.java#L26) + before they start sending data to BigQuery. Depending on the sink's parallelism, this throttling can be as high as 40 seconds. + Throttling is necessary to gracefully handle BigQuery's rate limiting on certain APIs used by the sink writers. Note that this only + affects the first checkpoint. * Delivery guarantee can be [at-least-once](https://nightlies.apache.org/flink/flink-docs-release-1.17/api/java/org/apache/flink/connector/base/DeliveryGuarantee.html#AT_LEAST_ONCE) or [exactly-once](https://nightlies.apache.org/flink/flink-docs-release-1.17/api/java/org/apache/flink/connector/base/DeliveryGuarantee.html#EXACTLY_ONCE). -* [BigQueryConnectOptions](https://github.com/GoogleCloudDataproc/flink-bigquery-connector/blob/main/flink-connector-bigquery-common/src/main/java/com/google/cloud/flink/bigquery/common/config/BigQueryConnectOptions.java) - stores information needed to connect to a BigQuery table. +* The at-least-once sink enables BigQuery client [multiplexing](https://cloud.google.com/bigquery/docs/write-api-streaming#use_multiplexing) + by default, which optimizes usage of BigQuery write APIs. This is not possible in exactly-once sink. * [AvroToProtoSerializer](https://github.com/GoogleCloudDataproc/flink-bigquery-connector/blob/main/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/serializer/AvroToProtoSerializer.java) - is the only out-of-the-box serializer offered for now. It expects data to arrive at the sink as avro's GenericRecord. Other - relevant data formats will be supported soon. Also, users can create their own implementation of + is the only out-of-the-box serializer offered for Datastream API. It expects data to arrive at the sink as avro's GenericRecord. + Users can create their own implementation of [BigQueryProtoSerializer](https://github.com/GoogleCloudDataproc/flink-bigquery-connector/blob/main/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/serializer/BigQueryProtoSerializer.java) for other data formats. -* [BigQuerySchemaProvider](https://github.com/GoogleCloudDataproc/flink-bigquery-connector/blob/main/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/serializer/BigQuerySchemaProvider.java) - exposes schema related information about the BigQuery table. This is needed by the sink to write data to BigQuery tables. It - can also be used by the serializer if needed (for instance, the AvroToProtoSerializer uses BigQuery table's schema). * Flink cannot automatically serialize avro's GenericRecord, hence users must explicitly specify type information - when using the AvroToProtoSerializer. Check Flink's [blog on non-trivial serialization](https://nightlies.apache.org/flink/flink-docs-release-1.17/api/java/org/apache/flink/connector/base/DeliveryGuarantee.html#AT_LEAST_ONCE). - Note that the avro schema needed here can be obtained from BigQuerySchemaProvider. + when maintaining records in avro format. Check Flink's [blog on non-trivial serialization](https://nightlies.apache.org/flink/flink-docs-release-1.17/api/java/org/apache/flink/connector/base/DeliveryGuarantee.html#AT_LEAST_ONCE). + Note that avro schema of an existing BigQuery table can be obtained from + [BigQuerySchemaProviderImpl](https://github.com/GoogleCloudDataproc/flink-bigquery-connector/blob/main/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/serializer/BigQuerySchemaProviderImpl.java). * The maximum parallelism of BigQuery sinks has been capped at **128**. This is to respect BigQuery storage [write quotas](https://cloud.google.com/bigquery/quotas#write-api-limits) while adhering to [best usage practices](https://cloud.google.com/bigquery/docs/write-api-best-practices). Users should either set [sink level parallelism](https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/execution/parallel/#operator-level) explicitly, or ensure that default job level parallelism is under 128. -* BigQuerySinkConfig requires the StreamExecutionEnvironment if delivery guarantee is exactly-once. - **Restart strategy must be explicitly set in the StreamExecutionEnvironment**. +* BigQuerySinkConfig requires the StreamExecutionEnvironment if delivery guarantee is exactly-once. + **Restart strategy must be explicitly set in the StreamExecutionEnvironment**. This is to [validate](https://github.com/GoogleCloudDataproc/flink-bigquery-connector/blob/92db3690c741fb2cdb99e28c575e19affb5c8b69/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/BigQuerySinkConfig.java#L185) the [restart strategy](https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/ops/state/task_failure_recovery/). Users are recommended to choose their application's restart strategy wisely, to avoid incessant retries which can potentially exhaust your BigQuery resource quota, and disrupt the BigQuery Storage API backend. Regardless of which strategy is - adopted, the restarts must be finite and graciously spaced. - **Using fixed delay restart is strongly discouraged, as a potential crash loop can quickly evaporate your Biguery resource quota.** + adopted, the restarts must be finite and graciously spaced. + **Using fixed delay restart is strongly discouraged, as a potential crash loop can quickly evaporate your project's Biguery resource quota.** * BigQuery sink's exactly-once mode follows the `Two Phase Commit` protocol. All data between two checkpoints is buffered in BigQuery's write streams, and committed to the destination BigQuery table upon successful checkpoint completion. This means that new data will be visible in the BigQuery table only at checkpoints. -* For exactly-once write consistency, checkpoint timeout should be liberal. This is because sink writers are [throttled](https://github.com/GoogleCloudDataproc/flink-bigquery-connector/blob/05fe7b14f2dc688bc808f553c3f863ba2380e317/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/throttle/WriteStreamCreationThrottler.java#L26) - before they start sending data to BigQuery. Depending on the sink's parallelism, this throttling can be as high as 40 seconds. - The reason being BigQuery's storage write APIs require a "slow start", where write streams are created at a steady rate of - 3 per second, and existing write streams should have reasonable utilization before new write streams are created. Note that - this applies to the initial checkpoint only, when sink writers start sending data to BigQuery for the first time. * If a data record cannot be serialized by BigQuery sink, then the record is dropped with a warning getting logged. In future, we plan to use dead letter queues to capture such data. **Important:** Please refer to [data ingestion pricing](https://cloud.google.com/bigquery/pricing#data_ingestion_pricing) to understand the BigQuery Storage Write API pricing. -#### Source: Unbounded +### Relevant Files -A timestamp [partitioned table](https://cloud.google.com/bigquery/docs/partitioned-tables) will be continuously checked for -“completed” partitions, which the connector will stream into the Flink application. +* Sink can be created using `get` method at `com.google.cloud.flink.bigquery.sink.BigQuerySink`. +* Sink configuration for Datastream API is defined at `com.google.cloud.flink.bigquery.sink.BigQuerySinkConfig`. +* Sink configuration for Table/SQL API is defined at `com.google.cloud.flink.bigquery.table.config.BigQuerySinkTableConfig`. +* BigQuery connection configuration is defined at `com.google.cloud.flink.bigquery.common.config.BigQueryConnectOptions`. +* Sample Flink application using connector is defined at `com.google.cloud.flink.bigquery.examples.BigQueryExample` for the Datastream API, + and at `com.google.cloud.flink.bigquery.examples.BigQueryTableExample` for the Table API and SQL. -```java -BigQuerySource source = - BigQuerySource.streamAvros( - BigQueryReadOptions.builder() - .setBigQueryConnectOptions( - BigQueryConnectOptions.builder() - .setProjectId(...) - .setDataset(...) - .setTable(...) - .build()) - .setColumnNames(...) // OPTIONAL - .setLimit(...) // OPTIONAL - .setMaxRecordsPerSplitFetch(...) // OPTIONAL - .setMaxStreamCount(...) // OPTIONAL - .setOldestPartitionId(...) // OPTIONAL - .setPartitionDiscoveryRefreshIntervalInMinutes(...) // OPTIONAL - .setRowRestriction(...) // OPTIONAL - .setSnapshotTimestampInMillis(...) // OPTIONAL - .build()); -``` -* A partition is considered “complete” if the table’s write buffer’s oldest entry’s ingestion time is after the partition’s - end. -* If the table’s write buffer is empty, then a partition is considered complete if `java.time.Instant.now()` is after the - partition’s end. -* This approach is susceptible to out-of-order data, and we plan to replace it with a lateness tolerance beyond the - partition’s end in future releases. +## Source -#### Source: Bounded +### Bounded Source In Datastream API A table will be read once, and its rows at the time will be streamed into the Flink application. ```java +// Sets source boundedness to Boundedness.BOUNDED BigQuerySource source = BigQuerySource.readAvros( BigQueryReadOptions.builder() @@ -317,7 +379,7 @@ BigQuerySource source = .build()); ``` -##### Query +#### Bounded Source Using BigQuery Query In Datastream API A SQL query will be executed in the GCP project, and its [view](https://cloud.google.com/bigquery/docs/views-intro) will be streamed into the Flink application. @@ -339,7 +401,73 @@ BigQuerySource bqSource = [manage these views](https://cloud.google.com/bigquery/docs/managing-views) on their own, until future releases expose a configuration in the connector to delete them or assign a time-to-live. -##### Connector Source Configurations +### Source In Table API + +```java +// Note: Users must create and register a catalog table before reading and writing to them. + +// final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +// env.enableCheckpointing(CHECKPOINT_INTERVAL); +// final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + +// Create the Config. +BigQueryTableConfig readTableConfig = new BigQueryReadTableConfig.Builder() + .table(...) // REQUIRED + .project(...) // REQUIRED + .dataset(...) // REQUIRED + .boundedness(...) // DEPRECATED: Defaults to Boundedness.BOUNDED + .partitionDiscoveryInterval(...) // DEPRECATED: Use if boundedness is CONTINUOUS_UNBOUNDED + .limit(...) // OPTIONAL + .columnProjection(...) // OPTIONAL + .snapshotTimestamp(...) // OPTIONAL + .rowRestriction(...) // OPTIONAL + .build(); + +// Create the catalog table. +tEnv.createTable( + "bigQuerySourceTable", + BigQueryTableSchemaProvider.getTableDescriptor(readTableConfig)); +Table sourceTable = tEnv.from("bigQuerySourceTable"); + +// Fetch entries in this sourceTable +sourceTable = sourceTable.select($("*")); +``` + +### [DEPRECATED] Unbounded Source In Datastream API + +A timestamp [partitioned table](https://cloud.google.com/bigquery/docs/partitioned-tables) will be continuously checked for +“completed” partitions, which the connector will stream into the Flink application. + +```java +// Sets source boundedness to Boundedness.CONTINUOUS_UNBOUNDED +BigQuerySource source = + BigQuerySource.streamAvros( + BigQueryReadOptions.builder() + .setBigQueryConnectOptions( + BigQueryConnectOptions.builder() + .setProjectId(...) + .setDataset(...) + .setTable(...) + .build()) + .setColumnNames(...) // OPTIONAL + .setLimit(...) // OPTIONAL + .setMaxRecordsPerSplitFetch(...) // OPTIONAL + .setMaxStreamCount(...) // OPTIONAL + .setOldestPartitionId(...) // OPTIONAL + .setPartitionDiscoveryRefreshIntervalInMinutes(...) // OPTIONAL + .setRowRestriction(...) // OPTIONAL + .setSnapshotTimestampInMillis(...) // OPTIONAL + .build()); +``` + +* A partition is considered “complete” if the table’s write buffer’s oldest entry’s ingestion time is after the partition’s + end. +* If the table’s write buffer is empty, then a partition is considered complete if `java.time.Instant.now()` is after the + partition’s end. +* This approach is susceptible to out-of-order data, and we plan to replace it with a lateness tolerance beyond the + partition’s end in future releases. + +### Source Configurations The connector supports a number of options to configure the source. @@ -347,7 +475,7 @@ The connector supports a number of options to configure the source. |----------------------------------------------|--------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | `projectId` | String | Google Cloud Project ID of the table. This config is required, and assumes no default value. | | `dataset` | String | Dataset containing the table. This config is required for standard tables, but not when loading query results. | -| `table` | String | BigQuery table in the format `[[projectId:]dataset.]table`. This config is required for standard tables, but not when loading query results. | +| `table` | String | BigQuery table name (not the full ID). This config is required for standard tables, but not when loading query results. | | `credentialsOptions` | CredentialsOptions | Google credentials for connecting to BigQuery. This config is optional, and default behavior is to use the `GOOGLE_APPLICATION_CREDENTIALS` environment variable.
**Note**: The query bounded source only uses default application credentials. | | `query` | String | BigQuery SQL query used in the bounded query source. This config is not required for bounded table or unbounded source. | | `columnNames` | List<String> | Columns to project from the table. This config is used in bounded table or unbounded source. If unspecified, all columns are fetched. | @@ -358,9 +486,10 @@ The connector supports a number of options to configure the source. | `snapshotTimeInMillis` | Long | Time (in milliseconds since epoch) for the BigQuery table snapshot to read. This config is used in bounded table or unbounded source. If unspecified, the latest snapshot is read. | | `oldestPartitionId` | String | Earliest table partition to consider for unbounded reads. This config is used in unbounded source. If unspecified, all partitions are read. | | `partitionDiscoveryRefreshIntervalInMinutes` | Integer | Periodicity (in minutes) of partition discovery in table. This config is used in unbounded source. If unspecified, the default value used is 10 minutes. | +| `boundedness` | Boundedness | Enum value indicating boundedness of the source. Only used with Flink's Table API, when setting BigQueryReadTableConfig. Default Value is `Boundedness.BOUNDED` | -#### Datatypes +### Datatypes All the current BigQuery datatypes are being handled when transforming data from BigQuery to Avro’s `GenericRecord`. @@ -385,130 +514,17 @@ All the current BigQuery datatypes are being handled when transforming data from | `TIME` | `STRING`, `LONG` | | `JSON` | `STRING` | -### Table API Support -* Table API is a high-level declarative API that allows users to describe what they want to do rather than how to do it. -* This results in simpler customer code and higher level pipelines that are more easily optimized in a managed service. -* The Table API is a superset of the SQL language and is specially designed for working with Apache Flink. -* It also allows language-embedded style support for queries in Java, Scala or Python besides the always available String values as queries in SQL. - -#### Sink -Please check sink's [documentation](https://github.com/GoogleCloudDataproc/flink-bigquery-connector/blob/main/README.md#datastream-api) -under Datastream API before proceeding. -```java -// Note: Users must create and register a catalog table before reading and writing to them. -// Schema of the source and sink catalog table must be the same - -// final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); -// env.enableCheckpointing(CHECKPOINT_INTERVAL); -// final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); +### Relevant Files -// Create the Config. -BigQueryTableConfig sinkTableConfig = BigQuerySinkTableConfig.newBuilder() - .table(...) // REQUIRED - .project(...) // REQUIRED - .dataset(...) // REQUIRED - .streamExecutionEnvironment(env) // REQUIRED if deliveryGuarantee is EXACTLY_ONCE - .sinkParallelism(...) // OPTIONAL; Should be atmost 128 - .deliveryGuarantee(...) // OPTIONAL; Default is AT_LEAST_ONCE - .build(); - -// Register the Sink Table -tEnv.createTable( - "bigQuerySinkTable", - BigQueryTableSchemaProvider.getTableDescriptor(sinkTableConfig)); - -// Insert entries in this sinkTable -sourceTable.executeInsert("bigQuerySinkTable"); -``` -Note: For jobs running on a dataproc cluster, via "gcloud dataproc submit", explicitly call `await()` after `executeInsert` to -wait for the job to complete. - -#### Source -```java -// Note: Users must create and register a catalog table before reading and writing to them. - -// final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); -// env.enableCheckpointing(CHECKPOINT_INTERVAL); -// final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); - -// Create the Config. -BigQueryTableConfig readTableConfig = new BigQueryReadTableConfig.Builder() - .table(...) // REQUIRED - .project(...) // REQUIRED - .dataset(...) // REQUIRED - .partitionDiscoveryInterval(...) // OPTIONAL; only in CONTINUOUS_UNBOUNDED source - .boundedness(...) // OPTIONAL; Defaults to Boundedness.BOUNDED - .limit(...) // OPTIONAL - .columnProjection(...) // OPTIONAL - .snapshotTimestamp(...) // OPTIONAL - .rowRestriction(...) // OPTIONAL - .build(); - -// Create the catalog table. -tEnv.createTable( - "bigQuerySourceTable", - BigQueryTableSchemaProvider.getTableDescriptor(readTableConfig)); -Table sourceTable = tEnv.from("bigQuerySourceTable"); +* Source factory methods are defined at `com.google.cloud.flink.bigquery.source.BigQuerySource`. +* Source configuration for Datastream API is defined at `com.google.cloud.flink.bigquery.source.config.BigQueryReadOptions`. +* Source configuration for Table/SQL API is defined at `com.google.cloud.flink.bigquery.table.config.BigQueryReadTableConfig`. +* BigQuery connection configuration is defined at `com.google.cloud.flink.bigquery.common.config.BigQueryConnectOptions`. +* Sample Flink application using connector is defined at `com.google.cloud.flink.bigquery.examples.BigQueryExample` for the Datastream API, + and at `com.google.cloud.flink.bigquery.examples.BigQueryTableExample` for the Table API and SQL. -// Fetch entries in this sourceTable -sourceTable = sourceTable.select($("*")); -``` -#### More Details: -* Input and Output tables (catalog tables) must be registered in the TableEnvironment. -* The schema of the registered table must match the schema of the query. -* Boundedness must be either `Boundedness.CONTINUOUS_UNBOUNDED` or `Boundedness.BOUNDED`. -* Checkpointing must be enabled as mentioned above. Delivery guarantee must be at-least-once. -* [BigQueryTableConfig](https://github.com/GoogleCloudDataproc/flink-bigquery-connector/blob/main/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/table/config/BigQueryTableConfig.java) stores information needed to connect to a BigQuery table. It could even be used to obtain the TableDescriptor required for the creation of Catalog Table.
Please refer to: - * [BigQueryReadTableConfig](https://github.com/GoogleCloudDataproc/flink-bigquery-connector/blob/main/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/table/config/BigQueryReadTableConfig.java) for more details on available read configurations. - * [BigQuerySinkTableConfig](https://github.com/GoogleCloudDataproc/flink-bigquery-connector/blob/main/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/table/config/BigQuerySinkTableConfig.java) for more details on available sink configurations. -* [RowDataToProtoSerializer](https://github.com/GoogleCloudDataproc/flink-bigquery-connector/blob/main/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/serializer/RowDataToProtoSerializer.java) is offered for serialization of `RowData` (since Table API read/writes `RowData` format records) records to BigQuery Proto Rows. This out-of-box serializer is automatically provided to the sink during runtime. -* [BigQueryTableSchemaProvider](https://github.com/GoogleCloudDataproc/flink-bigquery-connector/blob/main/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/serializer/BigQueryTableSchemaProvider.java) is a helper class which contains the method `getTableDescriptor()` which could be used to obtain a [TableDescriptor](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/TableDescriptor.html) for creation of catalog table via `BigQueryTableConfig` (`BigQuerySinkTableConfig` for sink options and `BigQueryReadTableConfig` for read options). -Users could also create their own catalog tables; provided the schema of the registered table, and the associated BigQuery table is the same. -* The connector supports a number of options to configure. - -| Property | Data Type | Description | Availability | -|----------------------------------------------|-------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------------------------------------------------| -| `projectId` | String | Google Cloud Project ID of the table. This config is required, and assumes no default value. | `BigQueryReadTableConfig`, `BigQuerySinkTableConfig` | -| `dataset` | String | Dataset containing the table. This config is required for standard tables, but not when loading query results. | `BigQueryReadTableConfig`, `BigQuerySinkTableConfig` | -| `table` | String | BigQuery table. This config is required for standard tables, but not when loading query results. | `BigQueryReadTableConfig`, `BigQuerySinkTableConfig` | -| `credentialAccessToken` | String | [Google Access token](https://cloud.google.com/docs/authentication/token-types#access) for connecting to BigQuery. This config is optional, and default behavior is to use the `GOOGLE_APPLICATION_CREDENTIALS` environment variable. | `BigQueryReadTableConfig`, `BigQuerySinkTableConfig` | -| `credentialFile` | String | [Google credentials](https://developers.google.com/workspace/guides/create-credentials) for connecting to BigQuery. This config is optional, and default behavior is to use the `GOOGLE_APPLICATION_CREDENTIALS` environment variable. | `BigQueryReadTableConfig`, `BigQuerySinkTableConfig` | -| `credentialKey` | String | [Google credentials Key](https://cloud.google.com/docs/authentication/api-keys) for connecting to BigQuery. This config is optional, and default behavior is to use the `GOOGLE_APPLICATION_CREDENTIALS` environment variable. | `BigQueryReadTableConfig`, `BigQuerySinkTableConfig` | -| `limit` | Integer | Maximum number of rows to read from table. This config is used in all source types. If unspecified, all rows are fetched. | `BigQueryReadTableConfig` | -| `rowRestriction` | String | BigQuery SQL query for row filter pushdown. This config is used in bounded table or unbounded source. If unspecified, all rows are fetched. | `BigQueryReadTableConfig` | -| `columnProjection` | String | Columns (comma separated list of values) to project from the table. This config is used in bounded table or unbounded source. If unspecified, all columns are fetched. | `BigQueryReadTableConfig` | -| `maxStreamCount` | Integer | Maximum read streams to open during a read session. BigQuery can return a lower number of streams than specified based on internal optimizations. This config is used in bounded table or unbounded source. If unspecified, this config is not set and BigQuery has complete control over the number of read streams created. | `BigQueryReadTableConfig` | -| `snapshotTimeInMillis` | Long | Time (in milliseconds since epoch) for the BigQuery table snapshot to read. This config is used in bounded table or unbounded source. If unspecified, the latest snapshot is read. | `BigQueryReadTableConfig` | -| `partitionDiscoveryRefreshIntervalInMinutes` | Integer | Periodicity (in minutes) of partition discovery in table. This config is used in unbounded source. If unspecified, the default value used is 10 minutes. | `BigQueryReadTableConfig` | -| `sinkParallelism` | Integer | Integer value indicating the parallelism for the sink. This config is used in unbounded source and is optional. If unspecified, the application decides the optimal parallelism.
Maximum value: 128. | `BigQuerySinkTableConfig` | -| `boundedness` | Boundedness | Enum value indicating boundedness of the source.
Possible values: `Boundedness.CONTINUOUS_UNBOUNDED` or `Boundedness.BOUNDED`.
Default Value: `Boundedness.BOUNDED` | `BigQueryReadTableConfig` | -| `deliveryGuarantee` | DeliveryGuarantee | Enum value indicating delivery guarantee of the source.
Possible values: `DeliveryGuarantee.EXACTLY_ONCE` or `DeliveryGuarantee.AT_LEAST_ONCE`.
Default Value: `DeliveryGuarantee.AT_LEAST_ONCE` | `BigQueryReadTableConfig` | -* Limitations: - * Inability to read and then write `TIME` type BigQuery records. Reading `TIME` type records and subsequently writing them to BigQuery would result in an error due to misconfigured types between -BigQuery and Flink's RowData.
This misconfiguration only happens when BigQuery is used as both the source and sink, connector works as expected for correctly formatted `RowData` records read from other sources. - * Incorrect value obtained during read and write of `BIGNUMERIC` type BigQuery Records. Reading `BIGNUMERIC` type records from a BigQuery table and subsequently writing them to -BigQuery would result in incorrect value being written to BigQuery as Flink's RowData does not support NUMERIC Types with precision more than 38 (BIGNUMERIC supports precision up to 76).
This mismatch only occurs due to bigquery's support for NUMERIC values with > 38 precision. The connector works as expected for other sources(even BigQuery) within the permitted(up to 38) range. - * Supports only `INSERT` type operations such as `SELECT`/`WHERE`, `UNION`, `JOIN`, etc. - -#### Catalog Tables: -* Catalog Table usage helps hide the complexities of interacting with different external systems behind a common interface. -* In Apache Flink, a CatalogTable represents the unresolved metadata of a table stored within a catalog. -* It is an encapsulation of all the characteristics that would typically define an SQL CREATE TABLE statement. -* This includes the table's schema (column names and data types), partitioning information, constraints etc. - It doesn't contain the actual table data. -* SQL Command for Catalog Table Creation - ```java - CREATE TABLE sample_catalog_table - (name STRING) // Schema Details - WITH - ('connector' = 'bigquery', - 'project' = '', - 'dataset' = '', - 'table' = ''); - ``` - -### Flink Metrics +## Flink Metrics Apache Flink allows collecting metrics internally to better understand the status of jobs and clusters during the development process. Each operator in Flink maintains its own set of metrics, @@ -524,6 +540,7 @@ supports collection and reporting of the following metrics in BigQuery sink: | `numberOfRecordsWrittenToBigQuerySinceCheckpoint` | Counter to keep track of the number of records successfully written to BigQuery since the last checkpoint. | At-least Once Sink | | `numberOfRecordsBufferedByBigQuerySinceCheckpoint` | Counter to keep track of the number of records currently buffered by the Storage Write API stream before committing them to the BigQuery Table. These records will be added to the Table following [Two Phase Commit Protocol's](https://nightlies.apache.org/flink/flink-docs-release-1.20/api/java/org/apache/flink/api/connector/sink2/Committer.html) `commit()` invocation. | Exactly Once Sink | + ## Example Application The `flink-1.17-connector-bigquery-examples` and `flink-1.17-connector-bigquery-table-api-examples` modules offer a sample Flink application powered by the connector. @@ -532,8 +549,11 @@ and at `com.google.cloud.flink.bigquery.examples.BigQueryTableExample` for the T It offers an intuitive hands-on application with elaborate guidance to test out the connector and its various configurations. + ## FAQ +Detailed guide for known issues and troubleshooting can be found [here](https://github.com/GoogleCloudDataproc/flink-bigquery-connector/blob/main/TROUBLESHOOT.md). + ### What is the pricing for the Storage API? See the [BigQuery Pricing Documentation](https://cloud.google.com/bigquery/pricing#storage-api). diff --git a/cloudbuild/nightly/cloudbuild.yaml b/cloudbuild/nightly/cloudbuild.yaml index bc359fae..4e10a716 100644 --- a/cloudbuild/nightly/cloudbuild.yaml +++ b/cloudbuild/nightly/cloudbuild.yaml @@ -101,10 +101,10 @@ steps: # 4. Start the nested schema table e2e test. - name: 'gcr.io/$PROJECT_ID/dataproc-flink-bigquery-connector-nightly' - id: 'e2e-bounded-nested-schema-table-test' + id: 'e2e-bounded-nested-schema-test' waitFor: ['create-clusters-bounded-small-table'] entrypoint: 'bash' - args: ['/workspace/cloudbuild/nightly/nightly.sh', 'e2e_bounded_nested_schema_table_test'] + args: ['/workspace/cloudbuild/nightly/nightly.sh', 'e2e_bounded_nested_schema_test'] env: - 'GCS_JAR_LOCATION=${_GCS_JAR_LOCATION}' - 'PROJECT_ID=${_PROJECT_ID}' @@ -120,7 +120,7 @@ steps: # 5. Table API nested schema table e2e test. - name: 'gcr.io/$PROJECT_ID/dataproc-flink-bigquery-connector-nightly' id: 'e2e-bounded-table-api-nested-schema-test' - waitFor: ['e2e-bounded-nested-schema-table-test'] + waitFor: ['e2e-bounded-nested-schema-test'] entrypoint: 'bash' args: ['/workspace/cloudbuild/nightly/nightly.sh', 'e2e_bounded_table_api_nested_schema_test'] env: diff --git a/cloudbuild/nightly/nightly.sh b/cloudbuild/nightly/nightly.sh index 9224f93c..9edca316 100644 --- a/cloudbuild/nightly/nightly.sh +++ b/cloudbuild/nightly/nightly.sh @@ -85,12 +85,13 @@ run_read_write_test(){ SINK_PARALLELISM=${11} # Take default value = false in case not provided. IS_SQL=${12:-False} + ENABLE_TABLE_CREATION=${13:-False} # Get the final region and the cluster name. export REGION=$(cat "$REGION_FILE") export CLUSTER_NAME=$(cat "$CLUSTER_FILE") - # Run the simple bounded write table test. - source cloudbuild/nightly/scripts/table_write.sh "$PROJECT_ID" "$CLUSTER_NAME" "$REGION" "$PROJECT_NAME" "$DATASET_NAME" "$SOURCE" "$DESTINATION_TABLE_NAME" "$IS_EXACTLY_ONCE_ENABLED" "$MODE" "$PROPERTIES" "$SINK_PARALLELISM" "$IS_SQL" + # Run the test. + source cloudbuild/nightly/scripts/table_write.sh "$PROJECT_ID" "$CLUSTER_NAME" "$REGION" "$PROJECT_NAME" "$DATASET_NAME" "$SOURCE" "$DESTINATION_TABLE_NAME" "$IS_EXACTLY_ONCE_ENABLED" "$MODE" "$PROPERTIES" "$SINK_PARALLELISM" "$IS_SQL" "$ENABLE_TABLE_CREATION" } # Function to run the test to check BQ Table Read and Write. @@ -144,75 +145,65 @@ case $STEP in ;; # Run the nested schema bounded e2e test. - e2e_bounded_nested_schema_table_test) - IS_EXACTLY_ONCE_ENABLED=False - run_read_write_test "$PROJECT_ID" "$REGION_SMALL_TEST_FILE" "$CLUSTER_SMALL_TEST_FILE" "$PROJECT_NAME" "$DATASET_NAME" "$TABLE_NAME_SOURCE_COMPLEX_SCHEMA_TABLE" "$TABLE_NAME_DESTINATION_COMPLEX_SCHEMA_TABLE" "$IS_EXACTLY_ONCE_ENABLED" "bounded" "$PROPERTIES_SMALL_BOUNDED_JOB" "$SINK_PARALLELISM_SMALL_BOUNDED_JOB" + e2e_bounded_nested_schema_test) + IS_SQL=False + ENABLE_TABLE_CREATION=True IS_EXACTLY_ONCE_ENABLED=True - run_read_write_test "$PROJECT_ID" "$REGION_SMALL_TEST_FILE" "$CLUSTER_SMALL_TEST_FILE" "$PROJECT_NAME" "$DATASET_NAME" "$TABLE_NAME_SOURCE_COMPLEX_SCHEMA_TABLE" "$TABLE_NAME_DESTINATION_COMPLEX_SCHEMA_TABLE" "$IS_EXACTLY_ONCE_ENABLED" "bounded" "$PROPERTIES_SMALL_BOUNDED_JOB" "$SINK_PARALLELISM_SMALL_BOUNDED_JOB" + run_read_write_test "$PROJECT_ID" "$REGION_SMALL_TEST_FILE" "$CLUSTER_SMALL_TEST_FILE" "$PROJECT_NAME" "$DATASET_NAME" "$TABLE_NAME_SOURCE_COMPLEX_SCHEMA_TABLE" "$TABLE_NAME_DESTINATION_COMPLEX_SCHEMA_TABLE" "$IS_EXACTLY_ONCE_ENABLED" "bounded" "$PROPERTIES_SMALL_BOUNDED_JOB" "$SINK_PARALLELISM_SMALL_BOUNDED_JOB" "$IS_SQL" "$ENABLE_TABLE_CREATION" exit ;; # Run the nested schema bounded Table API e2e test. e2e_bounded_table_api_nested_schema_test) IS_SQL=True + ENABLE_TABLE_CREATION=True IS_EXACTLY_ONCE_ENABLED=False - run_read_write_test "$PROJECT_ID" "$REGION_SMALL_TEST_FILE" "$CLUSTER_SMALL_TEST_FILE" "$PROJECT_NAME" "$DATASET_NAME" "$TABLE_NAME_SOURCE_COMPLEX_SCHEMA_TABLE" "$TABLE_NAME_DESTINATION_COMPLEX_SCHEMA_TABLE" "$IS_EXACTLY_ONCE_ENABLED" "bounded" "$PROPERTIES_SMALL_BOUNDED_JOB" "$SINK_PARALLELISM_SMALL_BOUNDED_JOB" "$IS_SQL" - IS_EXACTLY_ONCE_ENABLED=True - run_read_write_test "$PROJECT_ID" "$REGION_SMALL_TEST_FILE" "$CLUSTER_SMALL_TEST_FILE" "$PROJECT_NAME" "$DATASET_NAME" "$TABLE_NAME_SOURCE_COMPLEX_SCHEMA_TABLE" "$TABLE_NAME_DESTINATION_COMPLEX_SCHEMA_TABLE" "$IS_EXACTLY_ONCE_ENABLED" "bounded" "$PROPERTIES_SMALL_BOUNDED_JOB" "$SINK_PARALLELISM_SMALL_BOUNDED_JOB" "$IS_SQL" + run_read_write_test "$PROJECT_ID" "$REGION_SMALL_TEST_FILE" "$CLUSTER_SMALL_TEST_FILE" "$PROJECT_NAME" "$DATASET_NAME" "$TABLE_NAME_SOURCE_COMPLEX_SCHEMA_TABLE" "$TABLE_NAME_DESTINATION_COMPLEX_SCHEMA_TABLE" "$IS_EXACTLY_ONCE_ENABLED" "bounded" "$PROPERTIES_SMALL_BOUNDED_JOB" "$SINK_PARALLELISM_SMALL_BOUNDED_JOB" "$IS_SQL" "$ENABLE_TABLE_CREATION" exit ;; # Run the all datatypes bounded Table API e2e test. e2e_bounded_table_api_all_datatypes_test) IS_SQL=True + ENABLE_TABLE_CREATION=True IS_EXACTLY_ONCE_ENABLED=False - run_read_write_test "$PROJECT_ID" "$REGION_SMALL_TEST_FILE" "$CLUSTER_SMALL_TEST_FILE" "$PROJECT_NAME" "$DATASET_NAME" "$TABLE_NAME_SOURCE_ALL_DATATYPES_TABLE" "$TABLE_NAME_DESTINATION_ALL_DATATYPES_TABLE" "$IS_EXACTLY_ONCE_ENABLED" "bounded" "$PROPERTIES_SMALL_BOUNDED_JOB" "$SINK_PARALLELISM_SMALL_BOUNDED_JOB" "$IS_SQL" - IS_EXACTLY_ONCE_ENABLED=True - run_read_write_test "$PROJECT_ID" "$REGION_SMALL_TEST_FILE" "$CLUSTER_SMALL_TEST_FILE" "$PROJECT_NAME" "$DATASET_NAME" "$TABLE_NAME_SOURCE_ALL_DATATYPES_TABLE" "$TABLE_NAME_DESTINATION_ALL_DATATYPES_TABLE" "$IS_EXACTLY_ONCE_ENABLED" "bounded" "$PROPERTIES_SMALL_BOUNDED_JOB" "$SINK_PARALLELISM_SMALL_BOUNDED_JOB" "$IS_SQL" + run_read_write_test "$PROJECT_ID" "$REGION_SMALL_TEST_FILE" "$CLUSTER_SMALL_TEST_FILE" "$PROJECT_NAME" "$DATASET_NAME" "$TABLE_NAME_SOURCE_ALL_DATATYPES_TABLE" "$TABLE_NAME_DESTINATION_ALL_DATATYPES_TABLE" "$IS_EXACTLY_ONCE_ENABLED" "bounded" "$PROPERTIES_SMALL_BOUNDED_JOB" "$SINK_PARALLELISM_SMALL_BOUNDED_JOB" "$IS_SQL" "$ENABLE_TABLE_CREATION" exit ;; - # Run the query bounded e2e test. + # Run the query bounded e2e test. e2e_bounded_query_test) run_read_only_test_delete_cluster "$PROJECT_ID" "$REGION_SMALL_TEST_FILE" "$CLUSTER_SMALL_TEST_FILE" "$PROJECT_NAME" "$DATASET_NAME" "" "" "$QUERY" "bounded" "$PROPERTIES_SMALL_BOUNDED_JOB" exit ;; - # Run the large table O(GB's) bounded e2e test. + # Run the large table bounded e2e test. e2e_bounded_large_table_test) # Run the large table test. - IS_EXACTLY_ONCE_ENABLED=False - run_read_write_test "$PROJECT_ID" "$REGION_LARGE_TABLE_TEST_FILE" "$CLUSTER_LARGE_TABLE_TEST_FILE" "$PROJECT_NAME" "$DATASET_NAME" "$TABLE_NAME_SOURCE_LARGE_TABLE" "$TABLE_NAME_DESTINATION_LARGE_TABLE" "$IS_EXACTLY_ONCE_ENABLED" "bounded" "$PROPERTIES_LARGE_BOUNDED_JOB" "$SINK_PARALLELISM_LARGE_BOUNDED_JOB" IS_EXACTLY_ONCE_ENABLED=True run_read_write_test_delete_cluster "$PROJECT_ID" "$REGION_LARGE_TABLE_TEST_FILE" "$CLUSTER_LARGE_TABLE_TEST_FILE" "$PROJECT_NAME" "$DATASET_NAME" "$TABLE_NAME_SOURCE_LARGE_TABLE" "$TABLE_NAME_DESTINATION_LARGE_TABLE" "$IS_EXACTLY_ONCE_ENABLED" "bounded" "$PROPERTIES_LARGE_BOUNDED_JOB" "$SINK_PARALLELISM_LARGE_BOUNDED_JOB" exit ;; - # Run the large table O(GB's) bounded e2e test. + # Run the Table API large table bounded e2e test. e2e_bounded_table_api_large_table_test) # Run the large table test. IS_SQL=True IS_EXACTLY_ONCE_ENABLED=False run_read_write_test "$PROJECT_ID" "$REGION_TABLE_API_LARGE_TABLE_TEST_FILE" "$CLUSTER_TABLE_API_LARGE_TABLE_TEST_FILE" "$PROJECT_NAME" "$DATASET_NAME" "$TABLE_NAME_SOURCE_TABLE_API_LARGE_TABLE" "$TABLE_NAME_DESTINATION_TABLE_API_LARGE_TABLE" "$IS_EXACTLY_ONCE_ENABLED" "bounded" "$PROPERTIES_LARGE_BOUNDED_JOB" "$SINK_PARALLELISM_LARGE_BOUNDED_JOB" "$IS_SQL" - IS_EXACTLY_ONCE_ENABLED=True - run_read_write_test_delete_cluster "$PROJECT_ID" "$REGION_TABLE_API_LARGE_TABLE_TEST_FILE" "$CLUSTER_TABLE_API_LARGE_TABLE_TEST_FILE" "$PROJECT_NAME" "$DATASET_NAME" "$TABLE_NAME_SOURCE_TABLE_API_LARGE_TABLE" "$TABLE_NAME_DESTINATION_TABLE_API_LARGE_TABLE" "$IS_EXACTLY_ONCE_ENABLED" "bounded" "$PROPERTIES_LARGE_BOUNDED_JOB" "$SINK_PARALLELISM_LARGE_BOUNDED_JOB" "$IS_SQL" exit ;; - # Run the unbounded table e2e test. + # Run the unbounded e2e test. e2e_unbounded_test) IS_EXACTLY_ONCE_ENABLED=False - run_read_write_test "$PROJECT_ID" "$REGION_UNBOUNDED_TABLE_TEST_FILE" "$CLUSTER_UNBOUNDED_TABLE_TEST_FILE" "$PROJECT_NAME" "$DATASET_NAME" "$GCS_SOURCE_URI" "$TABLE_NAME_DESTINATION_UNBOUNDED_TABLE" "$IS_EXACTLY_ONCE_ENABLED" "unbounded" "$PROPERTIES_UNBOUNDED_JOB" "$SINK_PARALLELISM_UNBOUNDED_JOB" - IS_EXACTLY_ONCE_ENABLED=True run_read_write_test_delete_cluster "$PROJECT_ID" "$REGION_UNBOUNDED_TABLE_TEST_FILE" "$CLUSTER_UNBOUNDED_TABLE_TEST_FILE" "$PROJECT_NAME" "$DATASET_NAME" "$GCS_SOURCE_URI" "$TABLE_NAME_DESTINATION_UNBOUNDED_TABLE" "$IS_EXACTLY_ONCE_ENABLED" "unbounded" "$PROPERTIES_UNBOUNDED_JOB" "$SINK_PARALLELISM_UNBOUNDED_JOB" exit ;; - # Run the unbounded table e2e test. + # Run the Table API unbounded e2e test. e2e_table_api_unbounded_test) IS_SQL=True - IS_EXACTLY_ONCE_ENABLED=False - run_read_write_test "$PROJECT_ID" "$REGION_TABLE_API_UNBOUNDED_TABLE_TEST_FILE" "$CLUSTER_TABLE_API_UNBOUNDED_TABLE_TEST_FILE" "$PROJECT_NAME" "$DATASET_NAME" "$GCS_SOURCE_URI" "$TABLE_NAME_DESTINATION_UNBOUNDED_TABLE" "$IS_EXACTLY_ONCE_ENABLED" "unbounded" "$PROPERTIES_UNBOUNDED_JOB" "$SINK_PARALLELISM_UNBOUNDED_JOB" "$IS_SQL" IS_EXACTLY_ONCE_ENABLED=True run_read_write_test_delete_cluster "$PROJECT_ID" "$REGION_TABLE_API_UNBOUNDED_TABLE_TEST_FILE" "$CLUSTER_TABLE_API_UNBOUNDED_TABLE_TEST_FILE" "$PROJECT_NAME" "$DATASET_NAME" "$GCS_SOURCE_URI" "$TABLE_NAME_DESTINATION_UNBOUNDED_TABLE" "$IS_EXACTLY_ONCE_ENABLED" "unbounded" "$PROPERTIES_UNBOUNDED_JOB" "$SINK_PARALLELISM_UNBOUNDED_JOB" "$IS_SQL" exit diff --git a/cloudbuild/nightly/scripts/bounded_table_write.sh b/cloudbuild/nightly/scripts/bounded_table_write.sh index 82ba650d..694e85a3 100644 --- a/cloudbuild/nightly/scripts/bounded_table_write.sh +++ b/cloudbuild/nightly/scripts/bounded_table_write.sh @@ -17,11 +17,16 @@ PROPERTIES=$1 BOUNDED_JOB_SINK_PARALLELISM=$2 IS_SQL=$3 IS_EXACTLY_ONCE_ENABLED=$4 +ENABLE_TABLE_CREATION=$5 # We won't run this async as we can wait for a bounded job to succeed or fail. -# Create the destination table from the source table schema. -python3 cloudbuild/nightly/scripts/python-scripts/create_sink_table.py -- --project_name "$PROJECT_NAME" --dataset_name "$DATASET_NAME" --source_table_name "$SOURCE" --destination_table_name "$DESTINATION_TABLE_NAME" -# Set the expiration time to 1 hour. -bq update --expiration 3600 "$DATASET_NAME"."$DESTINATION_TABLE_NAME" +if [ "$ENABLE_TABLE_CREATION" == False ] +then + echo "Creating destination table before test" + # Create the destination table from the source table schema. + python3 cloudbuild/nightly/scripts/python-scripts/create_sink_table.py -- --project_name "$PROJECT_NAME" --dataset_name "$DATASET_NAME" --source_table_name "$SOURCE" --destination_table_name "$DESTINATION_TABLE_NAME" + # Set the expiration time to 1 hour. + bq update --expiration 3600 "$DATASET_NAME"."$DESTINATION_TABLE_NAME" +fi # Run the sink JAR JOB -gcloud dataproc jobs submit flink --id "$JOB_ID" --jar="$GCS_JAR_LOCATION" --cluster="$CLUSTER_NAME" --region="$REGION" --properties="$PROPERTIES" -- --gcp-source-project "$PROJECT_NAME" --bq-source-dataset "$DATASET_NAME" --bq-source-table "$SOURCE" --gcp-dest-project "$PROJECT_NAME" --bq-dest-dataset "$DATASET_NAME" --bq-dest-table "$DESTINATION_TABLE_NAME" --sink-parallelism "$BOUNDED_JOB_SINK_PARALLELISM" --is-sql "$IS_SQL" --exactly-once "$IS_EXACTLY_ONCE_ENABLED" +gcloud dataproc jobs submit flink --id "$JOB_ID" --jar="$GCS_JAR_LOCATION" --cluster="$CLUSTER_NAME" --region="$REGION" --properties="$PROPERTIES" -- --gcp-source-project "$PROJECT_NAME" --bq-source-dataset "$DATASET_NAME" --bq-source-table "$SOURCE" --gcp-dest-project "$PROJECT_NAME" --bq-dest-dataset "$DATASET_NAME" --bq-dest-table "$DESTINATION_TABLE_NAME" --sink-parallelism "$BOUNDED_JOB_SINK_PARALLELISM" --is-sql "$IS_SQL" --exactly-once "$IS_EXACTLY_ONCE_ENABLED" --enable-table-creation "$ENABLE_TABLE_CREATION" \ No newline at end of file diff --git a/cloudbuild/nightly/scripts/table_write.sh b/cloudbuild/nightly/scripts/table_write.sh index 6b57ef9b..8ffe2e48 100644 --- a/cloudbuild/nightly/scripts/table_write.sh +++ b/cloudbuild/nightly/scripts/table_write.sh @@ -27,6 +27,7 @@ MODE=$9 PROPERTIES=${10} SINK_PARALLELISM=${11} IS_SQL=${12} +ENABLE_TABLE_CREATION=${13} set -euxo pipefail gcloud config set project "$PROJECT_ID" @@ -55,7 +56,7 @@ then echo "At least once is Enabled!" DESTINATION_TABLE_NAME="$DESTINATION_TABLE_NAME"-ALO fi - source cloudbuild/nightly/scripts/bounded_table_write.sh "$PROPERTIES" "$SINK_PARALLELISM" "$IS_SQL" "$IS_EXACTLY_ONCE_ENABLED" + source cloudbuild/nightly/scripts/bounded_table_write.sh "$PROPERTIES" "$SINK_PARALLELISM" "$IS_SQL" "$IS_EXACTLY_ONCE_ENABLED" "$ENABLE_TABLE_CREATION" elif [ "$MODE" == "unbounded" ] then echo [LOGS: "$PROJECT_NAME" "$SOURCE" Write Test in Unbounded Mode] Created JOB ID: "$JOB_ID" diff --git a/flink-1.17-connector-bigquery/flink-connector-bigquery-examples/src/main/java/com/google/cloud/flink/bigquery/examples/BigQueryExample.java b/flink-1.17-connector-bigquery/flink-connector-bigquery-examples/src/main/java/com/google/cloud/flink/bigquery/examples/BigQueryExample.java index 7a5814d4..c78dbafc 100644 --- a/flink-1.17-connector-bigquery/flink-connector-bigquery-examples/src/main/java/com/google/cloud/flink/bigquery/examples/BigQueryExample.java +++ b/flink-1.17-connector-bigquery/flink-connector-bigquery-examples/src/main/java/com/google/cloud/flink/bigquery/examples/BigQueryExample.java @@ -20,7 +20,6 @@ import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; -import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.connector.base.DeliveryGuarantee; @@ -31,11 +30,11 @@ import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; +import com.google.cloud.bigquery.TimePartitioning; import com.google.cloud.flink.bigquery.common.config.BigQueryConnectOptions; import com.google.cloud.flink.bigquery.sink.BigQuerySink; import com.google.cloud.flink.bigquery.sink.BigQuerySinkConfig; import com.google.cloud.flink.bigquery.sink.serializer.AvroToProtoSerializer; -import com.google.cloud.flink.bigquery.sink.serializer.BigQuerySchemaProvider; import com.google.cloud.flink.bigquery.sink.serializer.BigQuerySchemaProviderImpl; import com.google.cloud.flink.bigquery.source.BigQuerySource; import com.google.cloud.flink.bigquery.source.config.BigQueryReadOptions; @@ -62,10 +61,6 @@ * multiple sources, and in this example, we show a combination of bounded and unbounded * sources. The resulting records can be written to another BQ table, with allowed delivery * guarantees at-least-once or exactly-once.
- * The sequence of operations in bounded and hybrid pipelines are: source > flatMap > keyBy - * > sum > print. Hybrid pipeline also includes a window operation.
- * The sequence of operations in the unbounded pipeline is: source > keyBy > map > sink - *
* Flink command line format is:
* flink run {additional runtime params} {path to this jar}/BigQueryExample.jar
* --gcp-source-project {required; project ID containing the source table}
@@ -74,6 +69,8 @@ * --gcp-sink-project {required; project ID containing the sink table}
* --bq-sink-dataset {required; name of dataset containing the sink table}
* --bq-sink-table {required; name of table to write to}
+ * --sink-partition-field {optional; partition field for destination table. Also enables table + * creation}
* --mode {optional; source read type. Allowed values are bounded (default), * unbounded or hybrid}
* --agg-prop {required; record property to aggregate in Flink job. Value must be string}
@@ -128,6 +125,7 @@ public static void main(String[] args) throws Exception { + " --gcp-sink-project " + " --bq-sink-dataset " + " --bq-sink-table " + + " --sink-partition-field " + " --agg-prop " + " --mode " + " --restriction " @@ -185,22 +183,36 @@ public static void main(String[] args) throws Exception { } String recordPropertyForTimestamps; + String sinkProjectName; + String sinkDatasetName; + String sinkTableName; + String sinkPartitionField; switch (mode) { case "bounded": + sinkProjectName = parameterTool.getRequired("gcp-sink-project"); + sinkDatasetName = parameterTool.getRequired("bq-sink-dataset"); + sinkTableName = parameterTool.getRequired("bq-sink-table"); + sinkPartitionField = parameterTool.get("sink-partition-field", null); runBoundedFlinkJob( sourceProjectName, sourceDatasetName, sourceTableName, + sinkProjectName, + sinkDatasetName, + sinkTableName, + sinkPartitionField, recordPropertyToAggregate, rowRestriction, recordLimit, - checkpointInterval); + checkpointInterval, + sinkMode); break; case "unbounded": - String sinkProjectName = parameterTool.getRequired("gcp-sink-project"); - String sinkDatasetName = parameterTool.getRequired("bq-sink-dataset"); - String sinkTableName = parameterTool.getRequired("bq-sink-table"); + sinkProjectName = parameterTool.getRequired("gcp-sink-project"); + sinkDatasetName = parameterTool.getRequired("bq-sink-dataset"); + sinkTableName = parameterTool.getRequired("bq-sink-table"); recordPropertyForTimestamps = parameterTool.getRequired("ts-prop"); + sinkPartitionField = parameterTool.get("sink-partition-field", null); runStreamingFlinkJob( sourceProjectName, sourceDatasetName, @@ -208,6 +220,7 @@ public static void main(String[] args) throws Exception { sinkProjectName, sinkDatasetName, sinkTableName, + sinkPartitionField, recordPropertyToAggregate, recordPropertyForTimestamps, rowRestriction, @@ -257,35 +270,79 @@ private static void runQueryFlinkJob( } private static void runBoundedFlinkJob( - String projectName, - String datasetName, - String tableName, + String sourceProjectName, + String sourceDatasetName, + String sourceTableName, + String sinkProjectName, + String sinkDatasetName, + String sinkTableName, + String sinkPartitionField, String recordPropertyToAggregate, String rowRestriction, Integer limit, - Long checkpointInterval) + Long checkpointInterval, + DeliveryGuarantee sinkMode) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(checkpointInterval); + BigQueryConnectOptions sourceConnectOptions = + BigQueryConnectOptions.builder() + .setProjectId(sourceProjectName) + .setDataset(sourceDatasetName) + .setTable(sourceTableName) + .build(); + BigQuerySource source = BigQuerySource.readAvros( BigQueryReadOptions.builder() - .setBigQueryConnectOptions( - BigQueryConnectOptions.builder() - .setProjectId(projectName) - .setDataset(datasetName) - .setTable(tableName) - .build()) + .setBigQueryConnectOptions(sourceConnectOptions) .setRowRestriction(rowRestriction) .setLimit(limit) .build()); + + BigQueryConnectOptions sinkConnectOptions = + BigQueryConnectOptions.builder() + .setProjectId(sinkProjectName) + .setDataset(sinkDatasetName) + .setTable(sinkTableName) + .build(); + + BigQuerySinkConfig.Builder sinkConfigBuilder = + BigQuerySinkConfig.newBuilder() + .connectOptions(sinkConnectOptions) + .streamExecutionEnvironment(env) + .deliveryGuarantee(sinkMode) + .serializer(new AvroToProtoSerializer()); + + if (sinkPartitionField != null) { + sinkConfigBuilder + .enableTableCreation(true) + .partitionField(sinkPartitionField) + .partitionType(TimePartitioning.Type.DAY); + } + env.fromSource(source, WatermarkStrategy.noWatermarks(), "BigQuerySource") - .flatMap(new FlatMapper(recordPropertyToAggregate)) - .keyBy(mappedTuple -> mappedTuple.f0) - .sum("f1") - .print(); + .keyBy(record -> record.get(recordPropertyToAggregate).hashCode() % 10000) + .map( + (GenericRecord genericRecord) -> { + genericRecord.put( + recordPropertyToAggregate, + genericRecord.get(recordPropertyToAggregate).toString() + + "_modified"); + return genericRecord; + }) + // Type hinting is required for Avro's GenericRecord to be serialized/deserialized + // when flowing through the job graph. + // Since the final record has same schema as the input from source, we use the same + // avro schema here. Ideally, users need to explicitly set the avro schema according + // to the record recieved by the sink. + .returns( + new GenericRecordAvroTypeInfo( + new BigQuerySchemaProviderImpl(sourceConnectOptions) + .getAvroSchema())) + .sinkTo(BigQuerySink.get(sinkConfigBuilder.build())); env.execute("Flink BigQuery Bounded Read Example"); } @@ -297,6 +354,7 @@ private static void runStreamingFlinkJob( String sinkProjectName, String sinkDatasetName, String sinkTableName, + String sinkPartitionField, String recordPropertyToAggregate, String recordPropertyForTimestamps, String rowRestriction, @@ -309,15 +367,16 @@ private static void runStreamingFlinkJob( DeliveryGuarantee sinkMode) throws Exception { + BigQueryConnectOptions sourceConnectOptions = + BigQueryConnectOptions.builder() + .setProjectId(sourceProjectName) + .setDataset(sourceDatasetName) + .setTable(sourceTableName) + .build(); BigQuerySource source = BigQuerySource.streamAvros( BigQueryReadOptions.builder() - .setBigQueryConnectOptions( - BigQueryConnectOptions.builder() - .setProjectId(sourceProjectName) - .setDataset(sourceDatasetName) - .setTable(sourceTableName) - .build()) + .setBigQueryConnectOptions(sourceConnectOptions) .setRowRestriction(rowRestriction) .setLimit(limit) .setOldestPartitionId(oldestPartition) @@ -335,17 +394,20 @@ private static void runStreamingFlinkJob( .setDataset(sinkDatasetName) .setTable(sinkTableName) .build(); - BigQuerySchemaProvider schemaProvider = new BigQuerySchemaProviderImpl(sinkConnectOptions); - BigQuerySinkConfig sinkConfig = + + BigQuerySinkConfig.Builder sinkConfigBuilder = BigQuerySinkConfig.newBuilder() .connectOptions(sinkConnectOptions) .streamExecutionEnvironment(env) .deliveryGuarantee(sinkMode) - .schemaProvider(schemaProvider) - .serializer(new AvroToProtoSerializer()) - .build(); + .serializer(new AvroToProtoSerializer()); - Sink sink = BigQuerySink.get(sinkConfig); + if (sinkPartitionField != null) { + sinkConfigBuilder + .enableTableCreation(true) + .partitionField(sinkPartitionField) + .partitionType(TimePartitioning.Type.DAY); + } env.fromSource( source, @@ -368,10 +430,14 @@ private static void runStreamingFlinkJob( }) // Type hinting is required for Avro's GenericRecord to be serialized/deserialized // when flowing through the job graph. + // Since the final record has same schema as the input from source, we use the same + // avro schema here. Ideally, users need to explicitly set the avro schema according + // to the record recieved by the sink. .returns( new GenericRecordAvroTypeInfo( - sinkConfig.getSchemaProvider().getAvroSchema())) - .sinkTo(sink); + new BigQuerySchemaProviderImpl(sourceConnectOptions) + .getAvroSchema())) + .sinkTo(BigQuerySink.get(sinkConfigBuilder.build())); env.execute("Flink BigQuery Unbounded Source And Sink Example"); } diff --git a/flink-1.17-connector-bigquery/flink-connector-bigquery-integration-test/src/main/java/com/google/cloud/flink/bigquery/integration/BigQueryIntegrationTest.java b/flink-1.17-connector-bigquery/flink-connector-bigquery-integration-test/src/main/java/com/google/cloud/flink/bigquery/integration/BigQueryIntegrationTest.java index 3e36c094..0399eb54 100644 --- a/flink-1.17-connector-bigquery/flink-connector-bigquery-integration-test/src/main/java/com/google/cloud/flink/bigquery/integration/BigQueryIntegrationTest.java +++ b/flink-1.17-connector-bigquery/flink-connector-bigquery-integration-test/src/main/java/com/google/cloud/flink/bigquery/integration/BigQueryIntegrationTest.java @@ -25,10 +25,7 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStrategyConfiguration; import org.apache.flink.api.common.state.CheckpointListener; -import org.apache.flink.api.common.state.ValueState; -import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.time.Time; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.utils.ParameterTool; @@ -42,7 +39,6 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.table.annotation.DataTypeHint; import org.apache.flink.table.annotation.FunctionHint; import org.apache.flink.table.api.DataTypes; @@ -55,6 +51,7 @@ import org.apache.flink.types.Row; import org.apache.flink.util.Collector; +import com.google.cloud.bigquery.TimePartitioning; import com.google.cloud.flink.bigquery.common.config.BigQueryConnectOptions; import com.google.cloud.flink.bigquery.sink.BigQuerySink; import com.google.cloud.flink.bigquery.sink.BigQuerySinkConfig; @@ -67,6 +64,7 @@ import com.google.cloud.flink.bigquery.table.config.BigQueryReadTableConfig; import com.google.cloud.flink.bigquery.table.config.BigQuerySinkTableConfig; import com.google.cloud.flink.bigquery.table.config.BigQueryTableConfig; +import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.slf4j.Logger; @@ -76,7 +74,6 @@ import java.time.Instant; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; -import java.util.Objects; import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -113,6 +110,7 @@ *
  • --bq-dest-table {optional; name of Destination BigQuery table to write}
    *
  • --sink-parallelism {optional; parallelism for sink job} *
  • --exactly-once {optional; set flag to enable exactly once approach} + *
  • --enable-table-creation {optional; set for creating BQ table in sink} *
  • --is-sql {optional; set flag to run Table API methods for read and write} * * The sequence of operations in the read and write pipeline is: source > map > sink. @@ -138,7 +136,8 @@ * {GCP_DESTINATION_PROJECT_ID} --bq-dest-dataset {BigQuery Destination Dataset Name} * --bq-dest-table {BigQuery Destination Table Name} --sink-parallelism {Parallelism to be * followed by the sink} --exactly-once {set flag to enable exactly once approach} --is-sql - * {set flag to enable running Flink's Table API methods}}
    + * {set flag to enable running Flink's Table API methods} --enable-table-creation {set flag + * for BQ table creation in sink}}
    *
  • Unbounded Job: an unbounded source (GCS Bucket) and writing to a BigQuery Table in the * unbounded mode.
    * This test requires some additional arguments besides the ones mentioned in the bounded @@ -203,6 +202,7 @@ public static void main(String[] args) throws Exception { + " --bq-dest-table " + " --sink-parallelism " + " --exactly-once " + + " --enable-table-creation " + " --mode " + " --query " + " --file-discovery-interval "); @@ -223,6 +223,7 @@ public static void main(String[] args) throws Exception { Integer sinkParallelism = parameterTool.getInt("sink-parallelism"); boolean isSqlEnabled = parameterTool.getBoolean("is-sql", false); boolean isExactlyOnceEnabled = parameterTool.getBoolean("exactly-once", false); + Boolean enableTableCreation = parameterTool.getBoolean("enable-table-creation", false); // Ignored for bounded run and can be set for unbounded mode (not required). String mode = parameterTool.get("mode", "bounded"); @@ -252,7 +253,8 @@ public static void main(String[] args) throws Exception { destDatasetName, destTableName, isExactlyOnceEnabled, - sinkParallelism); + sinkParallelism, + enableTableCreation); break; case "unbounded": gcsSourceUri = parameterTool.getRequired("gcs-source-uri"); @@ -290,7 +292,8 @@ public static void main(String[] args) throws Exception { destDatasetName, destTableName, isExactlyOnceEnabled, - sinkParallelism); + sinkParallelism, + enableTableCreation); break; case "unbounded": gcsSourceUri = parameterTool.getRequired("gcs-source-uri"); @@ -335,6 +338,7 @@ public static void main(String[] args) throws Exception { private static void runQueryFlinkJob(String projectName, String query) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setRestartStrategy(RESTART_STRATEGY); env.enableCheckpointing(CHECKPOINT_INTERVAL); BigQuerySource bqSource = @@ -355,22 +359,24 @@ private static void runBoundedFlinkJobWithSink( String destDatasetName, String destTableName, boolean exactlyOnce, - Integer sinkParallelism) + Integer sinkParallelism, + boolean enableTableCreation) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(CHECKPOINT_INTERVAL); env.setRestartStrategy(RESTART_STRATEGY); + BigQueryConnectOptions sourceConnectOptions = + BigQueryConnectOptions.builder() + .setProjectId(sourceGcpProjectName) + .setDataset(sourceDatasetName) + .setTable(sourceTableName) + .build(); BigQuerySource source = BigQuerySource.readAvros( BigQueryReadOptions.builder() - .setBigQueryConnectOptions( - BigQueryConnectOptions.builder() - .setProjectId(sourceGcpProjectName) - .setDataset(sourceDatasetName) - .setTable(sourceTableName) - .build()) + .setBigQueryConnectOptions(sourceConnectOptions) .build()); BigQueryConnectOptions sinkConnectOptions = @@ -380,20 +386,24 @@ private static void runBoundedFlinkJobWithSink( .setTable(destTableName) .build(); - BigQuerySchemaProvider destSchemaProvider = - new BigQuerySchemaProviderImpl(sinkConnectOptions); - - BigQuerySinkConfig sinkConfig = + BigQuerySinkConfig.Builder sinkConfigBuilder = BigQuerySinkConfig.newBuilder() .connectOptions(sinkConnectOptions) - .schemaProvider(destSchemaProvider) .serializer(new AvroToProtoSerializer()) .deliveryGuarantee( exactlyOnce ? DeliveryGuarantee.EXACTLY_ONCE : DeliveryGuarantee.AT_LEAST_ONCE) - .streamExecutionEnvironment(env) - .build(); + .streamExecutionEnvironment(env); + + if (enableTableCreation) { + sinkConfigBuilder + .enableTableCreation(true) + .partitionField("ts") + .partitionType(TimePartitioning.Type.DAY); + } + + BigQuerySinkConfig sinkConfig = sinkConfigBuilder.build(); DataStreamSink boundedStreamSink = env.fromSource( @@ -413,7 +423,7 @@ public GenericRecord map(GenericRecord genericRecord) }) .returns( new GenericRecordAvroTypeInfo( - sinkConfig.getSchemaProvider().getAvroSchema())) + getAvroTableSchema(sourceConnectOptions))) .sinkTo(BigQuerySink.get(sinkConfig)); if (sinkParallelism != null) { boundedStreamSink.setParallelism(sinkParallelism); @@ -591,6 +601,8 @@ private static void runBoundedFlinkJob( * @param destDatasetName Dataset name of the destination table. * @param destTableName Destination Table Name. * @param isExactlyOnce Boolean value, True if exactly-once mode, false otherwise. + * @param sinkParallelism Sink's parallelism. + * @param enableTableCreation Create BQ table in sink. * @throws Exception in a case of error, obtaining Table Descriptor. */ private static void runBoundedSQLFlinkJob( @@ -601,7 +613,8 @@ private static void runBoundedSQLFlinkJob( String destDatasetName, String destTableName, boolean isExactlyOnce, - Integer sinkParallelism) + Integer sinkParallelism, + boolean enableTableCreation) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -609,6 +622,12 @@ private static void runBoundedSQLFlinkJob( env.setRestartStrategy(RESTART_STRATEGY); final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + BigQueryConnectOptions sourceConnectOptions = + BigQueryConnectOptions.builder() + .setProjectId(sourceGcpProjectName) + .setDataset(sourceDatasetName) + .setTable(sourceTableName) + .build(); // Declare Read Options. BigQueryTableConfig readTableConfig = BigQueryReadTableConfig.newBuilder() @@ -630,7 +649,7 @@ private static void runBoundedSQLFlinkJob( .select($("*")) .addOrReplaceColumns(concat($("name"), "_write_test").as("name")); - BigQueryTableConfig sinkTableConfig = + BigQuerySinkTableConfig.Builder sinkTableConfigBuilder = BigQuerySinkTableConfig.newBuilder() .project(destGcpProjectName) .dataset(destDatasetName) @@ -640,17 +659,29 @@ private static void runBoundedSQLFlinkJob( .deliveryGuarantee( isExactlyOnce ? DeliveryGuarantee.EXACTLY_ONCE - : DeliveryGuarantee.AT_LEAST_ONCE) - .build(); + : DeliveryGuarantee.AT_LEAST_ONCE); + + TableDescriptor descriptor; + if (enableTableCreation) { + sinkTableConfigBuilder + .enableTableCreation(true) + .partitionField("ts") + .partitionType(TimePartitioning.Type.DAY); + org.apache.flink.table.api.Schema tableSchema = + getFlinkTableSchema(sourceConnectOptions); + descriptor = + BigQueryTableSchemaProvider.getTableDescriptor( + sinkTableConfigBuilder.build(), tableSchema); + } else { + descriptor = + BigQueryTableSchemaProvider.getTableDescriptor(sinkTableConfigBuilder.build()); + } // Register the Sink Table - tEnv.createTable( - "bigQuerySinkTable", - BigQueryTableSchemaProvider.getTableDescriptor(sinkTableConfig)); + tEnv.createTable("bigQuerySinkTable", descriptor); // Insert the table sourceTable to the registered sinkTable - TableResult res = sourceTable.executeInsert("bigQuerySinkTable"); - res.await(); + sourceTable.executeInsert("bigQuerySinkTable").await(30, TimeUnit.MINUTES); } /** @@ -827,44 +858,6 @@ public String map(GenericRecord value) throws Exception { } } - static class CustomKeyedProcessFunction - extends KeyedProcessFunction, Long> { - - private transient ValueState numRecords; - private final Long expectedValue; - - CustomKeyedProcessFunction(Long expectedValue) { - this.expectedValue = expectedValue; - } - - @Override - public void open(Configuration config) { - ValueStateDescriptor descriptor = - new ValueStateDescriptor<>("numRecords", TypeInformation.of(Long.class), 0L); - this.numRecords = getRuntimeContext().getState(descriptor); - } - - @Override - public void processElement( - Tuple2 value, - KeyedProcessFunction, Long>.Context ctx, - Collector out) - throws Exception { - this.numRecords.update(this.numRecords.value() + 1); - if (this.numRecords.value() > this.expectedValue) { - LOG.info( - String.format( - "Number of records processed (%d) exceed the expected count (%d)", - this.numRecords.value(), this.expectedValue)); - } else if (Objects.equals(this.numRecords.value(), this.expectedValue)) { - LOG.info( - String.format( - "%d number of records have been processed", this.expectedValue)); - } - out.collect(this.numRecords.value()); - } - } - static class FailingMapper implements MapFunction, CheckpointListener { @@ -881,4 +874,14 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception { } } } + + private static Schema getAvroTableSchema(BigQueryConnectOptions connectOptions) { + return new BigQuerySchemaProviderImpl(connectOptions).getAvroSchema(); + } + + private static org.apache.flink.table.api.Schema getFlinkTableSchema( + BigQueryConnectOptions connectOptions) { + Schema avroSchema = new BigQuerySchemaProviderImpl(connectOptions).getAvroSchema(); + return BigQueryTableSchemaProvider.getTableApiSchemaFromAvroSchema(avroSchema); + } } diff --git a/flink-1.17-connector-bigquery/flink-connector-bigquery-table-api-examples/src/main/java/com/google/cloud/flink/bigquery/examples/BigQueryTableExample.java b/flink-1.17-connector-bigquery/flink-connector-bigquery-table-api-examples/src/main/java/com/google/cloud/flink/bigquery/examples/BigQueryTableExample.java index 6d241f23..39af2e6d 100644 --- a/flink-1.17-connector-bigquery/flink-connector-bigquery-table-api-examples/src/main/java/com/google/cloud/flink/bigquery/examples/BigQueryTableExample.java +++ b/flink-1.17-connector-bigquery/flink-connector-bigquery-table-api-examples/src/main/java/com/google/cloud/flink/bigquery/examples/BigQueryTableExample.java @@ -21,23 +21,23 @@ import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.table.annotation.DataTypeHint; -import org.apache.flink.table.annotation.FunctionHint; import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableDescriptor; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; -import org.apache.flink.table.functions.TableFunction; -import org.apache.flink.types.Row; +import com.google.cloud.bigquery.TimePartitioning; +import com.google.cloud.flink.bigquery.common.config.BigQueryConnectOptions; +import com.google.cloud.flink.bigquery.sink.serializer.BigQuerySchemaProviderImpl; import com.google.cloud.flink.bigquery.sink.serializer.BigQueryTableSchemaProvider; import com.google.cloud.flink.bigquery.table.config.BigQueryReadTableConfig; import com.google.cloud.flink.bigquery.table.config.BigQuerySinkTableConfig; import com.google.cloud.flink.bigquery.table.config.BigQueryTableConfig; +import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static org.apache.flink.table.api.Expressions.$; -import static org.apache.flink.table.api.Expressions.call; /** * A simple BigQuery table read and sink example with Flink's Table API. @@ -62,6 +62,8 @@ * --gcp-sink-project {required; project ID containing the sink table}
    * --bq-sink-dataset {required; name of dataset containing the sink table}
    * --bq-sink-table {required; name of table to write to}
    + * --sink-partition-field {optional; partition field for destination table. Also enables table + * creation}
    * --mode {optional; source read type. Allowed values are bounded (default) or unbounded or * hybrid}
    * --restriction {optional; SQL filter applied at the BigQuery table before reading}
    @@ -91,6 +93,7 @@ public static void main(String[] args) throws Exception { + " --gcp-sink-project " + " --bq-sink-dataset " + " --bq-sink-table " + + " --sink-partition-field " + " --mode " + " --restriction " + " --limit " @@ -120,6 +123,7 @@ public static void main(String[] args) throws Exception { String destGcpProjectName = parameterTool.getRequired("gcp-sink-project"); String destDatasetName = parameterTool.getRequired("bq-sink-dataset"); String destTableName = parameterTool.getRequired("bq-sink-table"); + String sinkPartitionField = parameterTool.get("sink-partition-field", null); String deliveryGuarantee = parameterTool.get("delivery-guarantee", "at-least-once"); DeliveryGuarantee sinkMode; switch (deliveryGuarantee) { @@ -145,6 +149,7 @@ public static void main(String[] args) throws Exception { destGcpProjectName, destDatasetName, destTableName, + sinkPartitionField, sinkMode, rowRestriction, recordLimit, @@ -158,6 +163,7 @@ public static void main(String[] args) throws Exception { destGcpProjectName, destDatasetName, destTableName, + sinkPartitionField, sinkMode, rowRestriction, recordLimit, @@ -183,6 +189,7 @@ public static void main(String[] args) throws Exception { * @param destGcpProjectName The GCP Project name of the destination table. * @param destDatasetName Dataset name of the destination table. * @param destTableName Destination Table Name. + * @param sinkPartitionField Sink table partitioning. * @param sinkMode At-least-once or exactly-once write consistency. * @param rowRestriction String value, filtering the rows to be read. * @param limit Integer value, Number of rows to limit the read result. @@ -196,6 +203,7 @@ private static void runBoundedTableAPIFlinkJob( String destGcpProjectName, String destDatasetName, String destTableName, + String sinkPartitionField, DeliveryGuarantee sinkMode, String rowRestriction, Integer limit, @@ -205,8 +213,13 @@ private static void runBoundedTableAPIFlinkJob( final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(checkpointInterval); final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); - tEnv.createTemporarySystemFunction("func", MyFlatMapFunction.class); + BigQueryConnectOptions sourceConnectOptions = + BigQueryConnectOptions.builder() + .setProjectId(sourceGcpProjectName) + .setDataset(sourceDatasetName) + .setTable(sourceTableName) + .build(); // Declare Read Options. BigQueryTableConfig readTableConfig = BigQueryReadTableConfig.newBuilder() @@ -224,26 +237,40 @@ private static void runBoundedTableAPIFlinkJob( BigQueryTableSchemaProvider.getTableDescriptor(readTableConfig)); // Read the table and pass to flatmap. - Table sourceTable = - tEnv.from("bigQuerySourceTable") - .select($("*")) - .flatMap(call("func", Row.of($("name"), $("number"), $("ts")))) - .as("name", "number", "ts"); + // Hardcoded source schema to extract three columns: name (string), number (int) and ts + // (timestamp). + Table sourceTable = tEnv.from("bigQuerySourceTable").select($("*")); - BigQueryTableConfig sinkTableConfig = + BigQuerySinkTableConfig.Builder sinkTableConfigBuilder = BigQuerySinkTableConfig.newBuilder() .project(destGcpProjectName) .dataset(destDatasetName) .table(destTableName) .sinkParallelism(2) .deliveryGuarantee(sinkMode) - .streamExecutionEnvironment(env) - .build(); + .streamExecutionEnvironment(env); + + TableDescriptor descriptor; + if (sinkPartitionField != null) { + sinkTableConfigBuilder + .enableTableCreation(true) + .partitionField(sinkPartitionField) + .partitionType(TimePartitioning.Type.DAY); + // Since the final record has same schema as the input from source, we use the same + // table schema here. Ideally, users need to explicitly set the table schema according + // to the record recieved by the sink. + org.apache.flink.table.api.Schema tableSchema = + getFlinkTableSchema(sourceConnectOptions); + descriptor = + BigQueryTableSchemaProvider.getTableDescriptor( + sinkTableConfigBuilder.build(), tableSchema); + } else { + descriptor = + BigQueryTableSchemaProvider.getTableDescriptor(sinkTableConfigBuilder.build()); + } // Register the Sink Table - tEnv.createTable( - "bigQuerySinkTable", - BigQueryTableSchemaProvider.getTableDescriptor(sinkTableConfig)); + tEnv.createTable("bigQuerySinkTable", descriptor); // Insert the table sourceTable to the registered sinkTable sourceTable.executeInsert("bigQuerySinkTable"); @@ -262,6 +289,7 @@ private static void runBoundedTableAPIFlinkJob( * @param destGcpProjectName The GCP Project name of the destination table. * @param destDatasetName Dataset name of the destination table. * @param destTableName Destination Table Name. + * @param sinkPartitionField Sink table partitioning. * @param sinkMode At-least-once or exactly-once write consistency. * @param rowRestriction String value, filtering the rows to be read. * @param limit Integer value, Number of rows to limit the read result. @@ -275,6 +303,7 @@ private static void runStreamingTableAPIFlinkJob( String destGcpProjectName, String destDatasetName, String destTableName, + String sinkPartitionField, DeliveryGuarantee sinkMode, String rowRestriction, Integer limit, @@ -285,8 +314,13 @@ private static void runStreamingTableAPIFlinkJob( final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(checkpointInterval); final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); - tEnv.createTemporarySystemFunction("func", MyFlatMapFunction.class); + BigQueryConnectOptions sourceConnectOptions = + BigQueryConnectOptions.builder() + .setProjectId(sourceGcpProjectName) + .setDataset(sourceDatasetName) + .setTable(sourceTableName) + .build(); // Declare Read Options. BigQueryTableConfig readTableConfig = BigQueryReadTableConfig.newBuilder() @@ -309,28 +343,39 @@ private static void runStreamingTableAPIFlinkJob( sourceTable = sourceTable.select($("*")); // Declare Write Options. - BigQueryTableConfig sinkTableConfig = + BigQuerySinkTableConfig.Builder sinkTableConfigBuilder = BigQuerySinkTableConfig.newBuilder() .table(destTableName) .project(destGcpProjectName) .dataset(destDatasetName) .sinkParallelism(2) .deliveryGuarantee(sinkMode) - .streamExecutionEnvironment(env) - .build(); + .streamExecutionEnvironment(env); + + TableDescriptor descriptor; + if (sinkPartitionField != null) { + sinkTableConfigBuilder + .enableTableCreation(true) + .partitionField(sinkPartitionField) + .partitionType(TimePartitioning.Type.DAY); + // Since the final record has same schema as the input from source, we use the same + // table schema here. Ideally, users need to explicitly set the table schema according + // to the record recieved by the sink. + org.apache.flink.table.api.Schema tableSchema = + getFlinkTableSchema(sourceConnectOptions); + descriptor = + BigQueryTableSchemaProvider.getTableDescriptor( + sinkTableConfigBuilder.build(), tableSchema); + } else { + descriptor = + BigQueryTableSchemaProvider.getTableDescriptor(sinkTableConfigBuilder.build()); + } // Register the Sink Table - tEnv.createTable( - "bigQuerySinkTable", - BigQueryTableSchemaProvider.getTableDescriptor(sinkTableConfig)); + tEnv.createTable("bigQuerySinkTable", descriptor); // Insert the table sourceTable to the registered sinkTable - sourceTable = - sourceTable - .flatMap(call("func", Row.of($("name"), $("number"), $("ts")))) - .as("name", "number", "ts"); - - sourceTable.executeInsert("bigQuerySinkTable"); + sourceTable.executeInsert("bigQuerySinkTable").await(); } /** @@ -371,7 +416,6 @@ public static void runBoundedJoinFlinkJob( final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(checkpointInterval); final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); - tEnv.createTemporarySystemFunction("func", MyFlatMapFunction.class); // Declare Read Options. BigQueryTableConfig readTableConfig = @@ -434,15 +478,9 @@ public static void runBoundedJoinFlinkJob( + "leftSourceTable.id = rightSourceTable.id;"); } - /** Function to flatmap the Table API source Catalog Table. */ - @FunctionHint( - input = @DataTypeHint("ROW<`name` STRING, `number` BIGINT, `ts` TIMESTAMP(6)>"), - output = @DataTypeHint("ROW<`name` STRING, `number` BIGINT, `ts` TIMESTAMP(6)>")) - public static class MyFlatMapFunction extends TableFunction { - - public void eval(Row row) { - String str = (String) row.getField("name"); - collect(Row.of(str + "_write_test", row.getField("number"), row.getField("ts"))); - } + private static org.apache.flink.table.api.Schema getFlinkTableSchema( + BigQueryConnectOptions connectOptions) { + Schema avroSchema = new BigQuerySchemaProviderImpl(connectOptions).getAvroSchema(); + return BigQueryTableSchemaProvider.getTableApiSchemaFromAvroSchema(avroSchema); } } diff --git a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/serializer/BigQueryTableSchemaProvider.java b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/serializer/BigQueryTableSchemaProvider.java index 1d735636..317d6dfa 100644 --- a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/serializer/BigQueryTableSchemaProvider.java +++ b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/serializer/BigQueryTableSchemaProvider.java @@ -73,7 +73,7 @@ public static Schema getAvroSchemaFromLogicalSchema(LogicalType logicalType) { return avroSchemaConvertor.convertToSchema(logicalType); } - private static org.apache.flink.table.api.Schema getTableApiSchemaFromAvroSchema( + public static org.apache.flink.table.api.Schema getTableApiSchemaFromAvroSchema( Schema avroSchema) { Preconditions.checkNotNull( avroSchema, "Avro Schema not initialized before obtaining Table API Schema."); diff --git a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/source/BigQuerySource.java b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/source/BigQuerySource.java index 642ae221..48956551 100644 --- a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/source/BigQuerySource.java +++ b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/source/BigQuerySource.java @@ -335,6 +335,7 @@ public static BigQuerySource readAvros(BigQueryReadOptions readOp * @return A fully initialized instance of the source, ready to read {@link GenericRecord} from * the underlying table. */ + @Deprecated public static BigQuerySource streamAvros(BigQueryReadOptions readOptions) { BigQueryConnectOptions connectOptions = readOptions.getBigQueryConnectOptions(); TableSchema tableSchema = diff --git a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/table/config/BigQueryReadTableConfig.java b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/table/config/BigQueryReadTableConfig.java index d58e35c2..4a9bbfb5 100644 --- a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/table/config/BigQueryReadTableConfig.java +++ b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/table/config/BigQueryReadTableConfig.java @@ -33,7 +33,7 @@ public class BigQueryReadTableConfig extends BigQueryTableConfig { private final String columnProjection; private final Integer maxStreamCount; private final Long snapshotTimestamp; - private final Boundedness boundedness; + @Deprecated private final Boundedness boundedness; private final Integer partitionDiscoveryInterval; BigQueryReadTableConfig( @@ -220,6 +220,7 @@ public BigQueryReadTableConfig.Builder snapshotTimestamp(Long snapshotTimestamp) *
    * Default: Boundedness.BOUNDED - Bounded mode. */ + @Deprecated public BigQueryReadTableConfig.Builder boundedness(Boundedness boundedness) { this.boundedness = boundedness; return this;