diff --git a/flink-connector-bigquery-examples/src/main/java/com/google/cloud/flink/bigquery/examples/BigQueryExample.java b/flink-connector-bigquery-examples/src/main/java/com/google/cloud/flink/bigquery/examples/BigQueryExample.java index 85a2f4f4..1d2cb85c 100644 --- a/flink-connector-bigquery-examples/src/main/java/com/google/cloud/flink/bigquery/examples/BigQueryExample.java +++ b/flink-connector-bigquery-examples/src/main/java/com/google/cloud/flink/bigquery/examples/BigQueryExample.java @@ -34,20 +34,34 @@ /** * A simple Flink application using DataStream API and BigQuery connector. * - *

The Flink pipeline will try to read the specified BigQuery table, limiting the element count - * to the specified row restriction and limit, returning {@link GenericRecord} representing the - * rows, and finally print out some aggregated values given the provided payload's field. The - * sequence of operations in this pipeline is: source > flatMap > keyBy > max > print. + *

The Flink pipeline will try to read the specified BigQuery table according to given the + * command line arguments, returning {@link GenericRecord} representing the rows, and finally print + * out some aggregated values given the provided payload's field. The sequence of operations in this + * pipeline is: source > flatMap > keyBy > sum > print. * - *

Flink command line format to execute this application:
- * flink run {additional runtime params} {path to this jar}/BigQueryExample.jar
- * --gcp-project {required; project ID which contains the BigQuery table}
- * --bq-dataset {required; name of BigQuery dataset containing the desired table}
- * --bq-table {required; name of BigQuery table to read}
- * --agg-prop {required; record property to aggregate in Flink job}
- * --restriction {optional; SQL-like filter applied at the BigQuery table before reading}
- * --limit {optional; maximum records to read from BigQuery table}
- * --checkpoint-interval {optional; time interval between state checkpoints in milliseconds} + *

This example module should be used in one of the following two ways. + * + *

    + *
  1. Specify the BQ dataset and table with an optional row restriction. Flink command line + * format to execute this mode is:
    + * flink run {additional runtime params} {path to this jar}/BigQueryExample.jar
    + * --gcp-project {required; project ID which contains the BigQuery table}
    + * --bq-dataset {required; name of BigQuery dataset containing the desired table}
    + * --bq-table {required; name of BigQuery table to read}
    + * --agg-prop {required; record property to aggregate in Flink job}
    + * --restriction {optional; SQL filter applied at the BigQuery table before reading}
    + * --limit {optional; maximum records to read from BigQuery table}
    + * --checkpoint-interval {optional; time interval between state checkpoints in milliseconds} + *
  2. Specify SQL query to fetch data from BQ dataset. For example, "SELECT * FROM + * some_dataset.INFORMATION_SCHEMA.PARTITIONS". Flink command line format to execute this mode + * is:
    + * flink run {additional runtime params} {path to this jar}/BigQueryExample.jar
    + * --gcp-project {required; project ID which contains the BigQuery table}
    + * --query {required; SQL query to fetch data from BigQuery table}
    + * --agg-prop {required; record property to aggregate in Flink job}
    + * --limit {optional; maximum records to read from BigQuery table}
    + * --checkpoint-interval {optional; time interval between state checkpoints in milliseconds} + *
* *

Note on row restriction: In case a restriction relies on temporal reference, something like * {@code "TIMESTAMP_TRUNC(ingestion_timestamp, HOUR) = '2023-06-20 19:00:00'"}, and if launching @@ -63,36 +77,73 @@ public static void main(String[] args) throws Exception { // parse input arguments final ParameterTool parameterTool = ParameterTool.fromArgs(args); - if (parameterTool.getNumberOfParameters() < 4) { + if (parameterTool.getNumberOfParameters() < 1) { LOG.error( "Missing parameters!\n" - + "Usage: flink run BigQuery.jar" - + " --gcp-project " + + "Usage: flink run " + + " --gcp-project " + " --bq-dataset " + " --bq-table " - + " --agg-prop " + + " --agg-prop " + " --restriction " - + " --limit " - + " --checkpoint-interval "); + + " --limit " + + " --checkpoint-interval " + + " --query "); return; } String projectName = parameterTool.getRequired("gcp-project"); - String datasetName = parameterTool.getRequired("bq-dataset"); - String tableName = parameterTool.getRequired("bq-table"); - String rowRestriction = parameterTool.get("restriction", "").replace("\\u0027", "'"); - Integer recordLimit = parameterTool.getInt("limit", -1); + String query = parameterTool.get("query", ""); String recordPropertyToAggregate = parameterTool.getRequired("agg-prop"); + Integer recordLimit = parameterTool.getInt("limit", -1); Long checkpointInterval = parameterTool.getLong("checkpoint-interval", 60000L); - runFlinkJob( - projectName, - datasetName, - tableName, - recordPropertyToAggregate, - rowRestriction, - recordLimit, - checkpointInterval); + if (!query.isEmpty()) { + runFlinkQueryJob( + projectName, query, recordPropertyToAggregate, recordLimit, checkpointInterval); + } else { + String datasetName = parameterTool.getRequired("bq-dataset"); + String tableName = parameterTool.getRequired("bq-table"); + String rowRestriction = parameterTool.get("restriction", "").replace("\\u0027", "'"); + + runFlinkJob( + projectName, + datasetName, + tableName, + recordPropertyToAggregate, + rowRestriction, + recordLimit, + checkpointInterval); + } + } + + private static void runFlinkQueryJob( + String projectName, + String query, + String recordPropertyToAggregate, + Integer limit, + Long checkpointInterval) + throws Exception { + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(checkpointInterval); + + /** + * we will be reading avro generic records from BigQuery, and in this case we are assuming + * the GOOGLE_APPLICATION_CREDENTIALS env variable will be present in the execution runtime. + * In case of needing authenticate differently, the credentials builder (part of the + * BigQueryConnectOptions) should enable capturing the credentials from various sources. + */ + BigQuerySource bqSource = + BigQuerySource.readAvrosFromQuery(query, projectName, limit); + + env.fromSource(bqSource, WatermarkStrategy.noWatermarks(), "BigQueryQuerySource") + .flatMap(new FlatMapper(recordPropertyToAggregate)) + .keyBy(mappedTuple -> mappedTuple.f0) + .sum("f1") + .print(); + + env.execute("Flink BigQuery query example"); } private static void runFlinkJob( @@ -130,10 +181,10 @@ private static void runFlinkJob( env.fromSource(bqSource, WatermarkStrategy.noWatermarks(), "BigQuerySource") .flatMap(new FlatMapper(recordPropertyToAggregate)) .keyBy(mappedTuple -> mappedTuple.f0) - .max("f1") + .sum("f1") .print(); - env.execute("Flink BigQuery Example"); + env.execute("Flink BigQuery example"); } static class FlatMapper implements FlatMapFunction> { @@ -147,9 +198,7 @@ public FlatMapper(String recordPropertyToAggregate) { @Override public void flatMap(GenericRecord record, Collector> out) throws Exception { - out.collect( - Tuple2.of( - (String) record.get(recordPropertyToAggregate).toString(), 1)); + out.collect(Tuple2.of((String) record.get(recordPropertyToAggregate).toString(), 1)); } } }