Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds a "read from query results" example #53

Merged
merged 137 commits into from
Dec 5, 2023
Merged
Changes from all commits
Commits
Show all changes
137 commits
Select commit Hold shift + click to select a range
eb9dc8f
cleaning up the main branch for new version.
prodriguezdefino Jun 27, 2023
0695673
updates in ignore and readme files
prodriguezdefino Jun 27, 2023
e36cc44
prepping the pom addition, added parent's compliance tools
prodriguezdefino Jun 27, 2023
e167803
adding parent pom and the connector impl project pom
prodriguezdefino Jun 28, 2023
8043378
adding common functionalities
prodriguezdefino Jun 28, 2023
b455b4e
added the bigquery services wrapper and factories
prodriguezdefino Jun 28, 2023
27cd837
creates the split, its state and the enumerator state
prodriguezdefino Jun 28, 2023
4d4b60f
added configs, split reader and split assignment impls
prodriguezdefino Jun 28, 2023
0567b58
applying recommendations from sonartype-lift
prodriguezdefino Jun 28, 2023
9006714
adding the Datastream source implementation for BigQuery
prodriguezdefino Jun 28, 2023
d5d95bf
added Table API implementation for BigQuery
prodriguezdefino Jun 29, 2023
1263768
adding the example and shaded distro jar, fixes a NPE when the provid…
prodriguezdefino Jun 29, 2023
4e185bd
added covertura analysis through open clover and improved few tests
prodriguezdefino Jun 30, 2023
4e63b02
fixes partition and row restriction orden for push downs
prodriguezdefino Jul 1, 2023
74d5b90
adds a test which make the storage read server stream iterator fail f…
prodriguezdefino Jul 2, 2023
aff1d6a
renamed example's packages and pom's group id to the correct one
prodriguezdefino Jul 2, 2023
57d5a48
resolve locally errors that occur in the split reader
prodriguezdefino Jul 2, 2023
b16392f
added a read from query results example
prodriguezdefino Jul 4, 2023
5743292
merge changes from master (previous pom deletion resolution)
prodriguezdefino Jul 6, 2023
cab9115
fixing the package name for the schema namespace
prodriguezdefino Jul 6, 2023
3375d54
Merge branch 'common_code_source' into bq_services_wrappers
prodriguezdefino Jul 10, 2023
849f769
merged main branch and took care of few lift comments
prodriguezdefino Jul 11, 2023
fd70b95
Merge branch 'bq_services_wrappers' into source_splits
prodriguezdefino Jul 11, 2023
0f18d14
merge from source_split
prodriguezdefino Jul 11, 2023
6b08119
fixing lift recommendations and spotless
prodriguezdefino Jul 11, 2023
5973d26
Merge branch 'split_assigner_and_reader' into source_functionality
prodriguezdefino Jul 11, 2023
2d9635d
Merge branch 'source_functionality' into table_api
prodriguezdefino Jul 11, 2023
135beeb
Merge branch 'table_api' into add_example_and_shadedsqljar
prodriguezdefino Jul 11, 2023
78aec3e
merge from example and distro branch
prodriguezdefino Jul 11, 2023
3ffb582
fixes namespace error in test and spotless
prodriguezdefino Jul 11, 2023
cc45b2f
Merge branch 'table_api' into add_example_and_shadedsqljar
prodriguezdefino Jul 11, 2023
3472934
Merge branch 'add_example_and_shadedsqljar' into use_openclover_coverage
prodriguezdefino Jul 11, 2023
1dabd70
removed repeated mvn reference from command
prodriguezdefino Jul 11, 2023
27fa94a
Merge branch 'use_openclover_coverage' into partition_restriction_fixes
prodriguezdefino Jul 11, 2023
1d0dafa
adds documentation to enum
prodriguezdefino Jul 11, 2023
289c31c
Merge branch 'partition_restriction_fixes' into test_bqstoragereadapi…
prodriguezdefino Jul 11, 2023
882816e
merge from test_bqstoragereadapi_error branch
prodriguezdefino Jul 11, 2023
161f577
Merge branch 'fix_artifact_groupid_name' into locally_retry_splitread…
prodriguezdefino Jul 11, 2023
d55d12f
fixes presubmit problem
prodriguezdefino Jul 11, 2023
3ce56f0
Merge branch 'locally_retry_splitreadererror' into query_example
prodriguezdefino Jul 11, 2023
f473d57
addressing comments from review
prodriguezdefino Jul 27, 2023
c178f83
merge from main
prodriguezdefino Aug 1, 2023
09eaaa4
merge from master
prodriguezdefino Aug 1, 2023
def3cc4
Merge branch 'source_splits' into split_assigner_and_reader
prodriguezdefino Aug 1, 2023
ceabb12
fixed type reference Int -> Long
prodriguezdefino Aug 1, 2023
0dc8875
Merge branch 'split_assigner_and_reader' into source_functionality
prodriguezdefino Aug 3, 2023
963c80b
Merge branch 'source_functionality' into table_api
prodriguezdefino Aug 3, 2023
5206ef1
Merge branch 'table_api' into add_example_and_shadedsqljar
prodriguezdefino Aug 3, 2023
2b59a7c
Merge branch 'add_example_and_shadedsqljar' into use_openclover_coverage
prodriguezdefino Aug 3, 2023
4eb1137
Merge branch 'use_openclover_coverage' into partition_restriction_fixes
prodriguezdefino Aug 3, 2023
6db9dea
Merge branch 'partition_restriction_fixes' into test_bqstoragereadapi…
prodriguezdefino Aug 3, 2023
237f798
Merge branch 'test_bqstoragereadapi_error' into fix_artifact_groupid_…
prodriguezdefino Aug 3, 2023
8f12843
Merge branch 'fix_artifact_groupid_name' into locally_retry_splitread…
prodriguezdefino Aug 3, 2023
6348b21
fixed reference Int -> Long
prodriguezdefino Aug 3, 2023
9913587
Merge branch 'fix_artifact_groupid_name' into query_example
prodriguezdefino Aug 3, 2023
5919fd8
Merge branch 'locally_retry_splitreadererror' into query_example
prodriguezdefino Aug 3, 2023
ea506a1
fix cloudbuild script merge
prodriguezdefino Aug 3, 2023
80cf344
Merge branch 'fix_artifact_groupid_name' into locally_retry_splitread…
prodriguezdefino Aug 3, 2023
1a80c5e
Merge branch 'fix_artifact_groupid_name' into query_example
prodriguezdefino Aug 3, 2023
12264bf
fix directory reference name for codecov
prodriguezdefino Aug 3, 2023
67955dc
Merge branch 'fix_artifact_groupid_name' into locally_retry_splitread…
prodriguezdefino Aug 4, 2023
9eb78a0
Merge branch 'fix_artifact_groupid_name' into query_example
prodriguezdefino Aug 4, 2023
1734bac
merge from main
prodriguezdefino Aug 8, 2023
e96ff59
addressing comments from review
prodriguezdefino Aug 8, 2023
3b78492
Merge branch 'split_assigner_and_reader' into source_functionality
prodriguezdefino Aug 8, 2023
c492f02
improved hashcode and equals readability
prodriguezdefino Aug 8, 2023
6a05498
Merge branch 'split_assigner_and_reader' into source_functionality
prodriguezdefino Aug 8, 2023
820fb3b
Merge branch 'source_functionality' into table_api
prodriguezdefino Aug 8, 2023
dd5be94
improve readibility for hashcode and equals
prodriguezdefino Aug 8, 2023
fbd07c6
Merge branch 'table_api' into add_example_and_shadedsqljar
prodriguezdefino Aug 8, 2023
a44b11d
Merge branch 'add_example_and_shadedsqljar' into use_openclover_coverage
prodriguezdefino Aug 8, 2023
156552f
Merge branch 'use_openclover_coverage' into partition_restriction_fixes
prodriguezdefino Aug 8, 2023
31559d5
Merge branch 'partition_restriction_fixes' into test_bqstoragereadapi…
prodriguezdefino Aug 8, 2023
53c514e
Merge branch 'test_bqstoragereadapi_error' into fix_artifact_groupid_…
prodriguezdefino Aug 8, 2023
656c831
Merge branch 'fix_artifact_groupid_name' into locally_retry_splitread…
prodriguezdefino Aug 8, 2023
bc626cd
Merge branch 'locally_retry_splitreadererror' into query_example
prodriguezdefino Aug 8, 2023
9aae0af
changed tests to use google-truth instead of junit or assertj asserti…
prodriguezdefino Aug 9, 2023
61e5644
Merge branch 'split_assigner_and_reader' into source_functionality
prodriguezdefino Aug 9, 2023
517de82
added google-truth to tests
prodriguezdefino Aug 9, 2023
d916a1c
Merge branch 'source_functionality' into table_api
prodriguezdefino Aug 9, 2023
11fce0d
added google-truth to tests
prodriguezdefino Aug 9, 2023
099078e
Merge branch 'table_api' into add_example_and_shadedsqljar
prodriguezdefino Aug 10, 2023
40db3cb
merge from previous branch
prodriguezdefino Aug 10, 2023
8dbf958
merge from previous branch
prodriguezdefino Aug 10, 2023
d1c56d4
merge from previous branch
prodriguezdefino Aug 10, 2023
12553a4
Merge branch 'test_bqstoragereadapi_error' into fix_artifact_groupid_…
prodriguezdefino Aug 10, 2023
bba0167
Merge branch 'fix_artifact_groupid_name' into locally_retry_splitread…
prodriguezdefino Aug 10, 2023
c860c28
Merge branch 'locally_retry_splitreadererror' into query_example
prodriguezdefino Aug 10, 2023
9da7706
merge from master after #44
prodriguezdefino Oct 31, 2023
a10470c
Merge branch 'source_functionality' into table_api
prodriguezdefino Oct 31, 2023
6b28a0c
removing guava dependency from file
prodriguezdefino Nov 1, 2023
7160ff9
merge from master
prodriguezdefino Nov 1, 2023
87780c5
adding serializable autovalue annotation to avoid storing an Optional…
prodriguezdefino Nov 2, 2023
3f4d1be
addressing comments from review
prodriguezdefino Nov 2, 2023
c8eb789
addressed comments from review
prodriguezdefino Nov 2, 2023
fd79610
Merge branch 'table_api' into add_example_and_shadedsqljar
prodriguezdefino Nov 2, 2023
8008260
merge from #47
prodriguezdefino Nov 2, 2023
c923f31
merge from #48
prodriguezdefino Nov 2, 2023
d135144
Merge branch 'partition_restriction_fixes' into test_bqstoragereadapi…
prodriguezdefino Nov 3, 2023
ad2e4e5
Merge branch 'test_bqstoragereadapi_error' into fix_artifact_groupid_…
prodriguezdefino Nov 3, 2023
ebe9014
merge from previous branch
prodriguezdefino Nov 3, 2023
cce6567
partition list in 1024 chunks
prodriguezdefino Nov 3, 2023
f36f91f
Merge branch 'table_api' into add_example_and_shadedsqljar
prodriguezdefino Nov 3, 2023
59472b7
Merge branch 'add_example_and_shadedsqljar' into use_openclover_coverage
prodriguezdefino Nov 3, 2023
3d994a2
Merge branch 'use_openclover_coverage' into partition_restriction_fixes
prodriguezdefino Nov 3, 2023
d9027ab
Merge branch 'partition_restriction_fixes' into test_bqstoragereadapi…
prodriguezdefino Nov 3, 2023
62c87f0
Merge branch 'test_bqstoragereadapi_error' into fix_artifact_groupid_…
prodriguezdefino Nov 3, 2023
87fd326
Merge branch 'fix_artifact_groupid_name' into locally_retry_splitread…
prodriguezdefino Nov 3, 2023
0ba5ce1
Merge branch 'locally_retry_splitreadererror' into query_example
prodriguezdefino Nov 3, 2023
5260073
Address review comments
jayehwhyehentee Nov 23, 2023
936076b
adding the example and shaded distro jar, fixes a NPE when the provid…
prodriguezdefino Jun 29, 2023
7977576
Merge branch 'add_example_and_shadedsqljar' of https://github.com/pro…
jayehwhyehentee Nov 24, 2023
1460af6
added covertura analysis through open clover and improved few tests
prodriguezdefino Jun 30, 2023
a8499c3
removed repeated mvn reference from command
prodriguezdefino Jul 11, 2023
229f33d
Merge branch 'main' into use_openclover_coverage
jayehwhyehentee Nov 24, 2023
68d0bf9
Merge branch 'main' into add_example_and_shadedsqljar
jayehwhyehentee Nov 24, 2023
608e3c7
Merge branch 'add_example_and_shadedsqljar' into use_openclover_coverage
jayehwhyehentee Nov 24, 2023
9d130a4
Merge branch 'use_openclover_coverage' into partition_restriction_fixes
jayehwhyehentee Nov 24, 2023
2711002
Merge branch 'partition_restriction_fixes' into test_bqstoragereadapi…
jayehwhyehentee Nov 24, 2023
c9f73ee
Address review comments
jayehwhyehentee Nov 27, 2023
8c9cc26
Merge branch 'add_example_and_shadedsqljar' into use_openclover_coverage
jayehwhyehentee Nov 27, 2023
9a6a3bf
Merge branch 'use_openclover_coverage' into partition_restriction_fixes
jayehwhyehentee Nov 27, 2023
c6ba786
Address review comments
jayehwhyehentee Nov 27, 2023
27c34d0
Make checkpoint interval configurable in example jar
jayehwhyehentee Nov 27, 2023
fa4f5a4
Merge branch 'add_example_and_shadedsqljar' into use_openclover_coverage
jayehwhyehentee Nov 27, 2023
582879e
Merge branch 'use_openclover_coverage' into partition_restriction_fixes
jayehwhyehentee Nov 27, 2023
2f4518a
Merge branch 'main' into partition_restriction_fixes
jayehwhyehentee Nov 27, 2023
dde0b6d
Remove incorrect dependency to fix build failure
jayehwhyehentee Nov 27, 2023
04497c5
Merge branch 'partition_restriction_fixes' into test_bqstoragereadapi…
jayehwhyehentee Nov 27, 2023
aac9463
Merge branch 'test_bqstoragereadapi_error' into fix_artifact_groupid_…
jayehwhyehentee Nov 27, 2023
cb2c690
Merge branch 'main' into fix_artifact_groupid_name
jayehwhyehentee Nov 28, 2023
57402ba
Merge branch 'fix_artifact_groupid_name' into locally_retry_splitread…
jayehwhyehentee Nov 28, 2023
92c1396
Merge branch 'locally_retry_splitreadererror' into query_example
jayehwhyehentee Nov 28, 2023
717a9c9
Merge branch 'main' into locally_retry_splitreadererror
jayehwhyehentee Dec 4, 2023
8ad357d
Merge branch 'locally_retry_splitreadererror' into query_example
jayehwhyehentee Dec 4, 2023
7b47e2a
Merge branch 'main' into query_example
jayehwhyehentee Dec 5, 2023
fe056c8
Fix example module and documentation
jayehwhyehentee Dec 5, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,34 @@
/**
* A simple Flink application using DataStream API and BigQuery connector.
*
* <p>The Flink pipeline will try to read the specified BigQuery table, limiting the element count
* to the specified row restriction and limit, returning {@link GenericRecord} representing the
* rows, and finally print out some aggregated values given the provided payload's field. The
* sequence of operations in this pipeline is: <i>source > flatMap > keyBy > max > print</i>.
* <p>The Flink pipeline will try to read the specified BigQuery table according to given the
* command line arguments, returning {@link GenericRecord} representing the rows, and finally print
* out some aggregated values given the provided payload's field. The sequence of operations in this
* pipeline is: <i>source > flatMap > keyBy > sum > print</i>.
*
* <p>Flink command line format to execute this application: <br>
* flink run {additional runtime params} {path to this jar}/BigQueryExample.jar <br>
* --gcp-project {required; project ID which contains the BigQuery table} <br>
* --bq-dataset {required; name of BigQuery dataset containing the desired table} <br>
* --bq-table {required; name of BigQuery table to read} <br>
* --agg-prop {required; record property to aggregate in Flink job} <br>
* --restriction {optional; SQL-like filter applied at the BigQuery table before reading} <br>
* --limit {optional; maximum records to read from BigQuery table} <br>
* --checkpoint-interval {optional; time interval between state checkpoints in milliseconds}
* <p>This example module should be used in one of the following two ways.
*
* <ol>
* <li>Specify the BQ dataset and table with an optional row restriction. Flink command line
* format to execute this mode is: <br>
* flink run {additional runtime params} {path to this jar}/BigQueryExample.jar <br>
* --gcp-project {required; project ID which contains the BigQuery table} <br>
* --bq-dataset {required; name of BigQuery dataset containing the desired table} <br>
* --bq-table {required; name of BigQuery table to read} <br>
* --agg-prop {required; record property to aggregate in Flink job} <br>
* --restriction {optional; SQL filter applied at the BigQuery table before reading} <br>
* --limit {optional; maximum records to read from BigQuery table} <br>
* --checkpoint-interval {optional; time interval between state checkpoints in milliseconds}
* <li>Specify SQL query to fetch data from BQ dataset. For example, "SELECT * FROM
* some_dataset.INFORMATION_SCHEMA.PARTITIONS". Flink command line format to execute this mode
* is: <br>
* flink run {additional runtime params} {path to this jar}/BigQueryExample.jar <br>
* --gcp-project {required; project ID which contains the BigQuery table} <br>
* --query {required; SQL query to fetch data from BigQuery table} <br>
* --agg-prop {required; record property to aggregate in Flink job} <br>
* --limit {optional; maximum records to read from BigQuery table} <br>
* --checkpoint-interval {optional; time interval between state checkpoints in milliseconds}
* </ol>
*
* <p>Note on row restriction: In case a restriction relies on temporal reference, something like
* {@code "TIMESTAMP_TRUNC(ingestion_timestamp, HOUR) = '2023-06-20 19:00:00'"}, and if launching
Expand All @@ -63,36 +77,73 @@ public static void main(String[] args) throws Exception {
// parse input arguments
final ParameterTool parameterTool = ParameterTool.fromArgs(args);

if (parameterTool.getNumberOfParameters() < 4) {
if (parameterTool.getNumberOfParameters() < 1) {
LOG.error(
"Missing parameters!\n"
+ "Usage: flink run <additional runtime params> BigQuery.jar"
+ " --gcp-project <gcp-project>"
+ "Usage: flink run <additional runtime params> <jar>"
+ " --gcp-project <gcp project id>"
+ " --bq-dataset <dataset name>"
+ " --bq-table <table name>"
+ " --agg-prop <record property>"
+ " --agg-prop <record property to aggregate>"
+ " --restriction <row filter predicate>"
+ " --limit <limit records returned>"
+ " --checkpoint-interval <milliseconds between state checkpoints>");
+ " --limit <limit on records returned>"
+ " --checkpoint-interval <milliseconds between state checkpoints>"
+ " --query <SQL query to get data from BQ table>");
return;
}

String projectName = parameterTool.getRequired("gcp-project");
String datasetName = parameterTool.getRequired("bq-dataset");
String tableName = parameterTool.getRequired("bq-table");
String rowRestriction = parameterTool.get("restriction", "").replace("\\u0027", "'");
Integer recordLimit = parameterTool.getInt("limit", -1);
String query = parameterTool.get("query", "");
String recordPropertyToAggregate = parameterTool.getRequired("agg-prop");
Integer recordLimit = parameterTool.getInt("limit", -1);
Long checkpointInterval = parameterTool.getLong("checkpoint-interval", 60000L);

runFlinkJob(
projectName,
datasetName,
tableName,
recordPropertyToAggregate,
rowRestriction,
recordLimit,
checkpointInterval);
if (!query.isEmpty()) {
runFlinkQueryJob(
projectName, query, recordPropertyToAggregate, recordLimit, checkpointInterval);
} else {
String datasetName = parameterTool.getRequired("bq-dataset");
String tableName = parameterTool.getRequired("bq-table");
String rowRestriction = parameterTool.get("restriction", "").replace("\\u0027", "'");

runFlinkJob(
projectName,
datasetName,
tableName,
recordPropertyToAggregate,
rowRestriction,
recordLimit,
checkpointInterval);
}
}

private static void runFlinkQueryJob(
String projectName,
String query,
String recordPropertyToAggregate,
Integer limit,
Long checkpointInterval)
throws Exception {

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(checkpointInterval);

/**
* we will be reading avro generic records from BigQuery, and in this case we are assuming
* the GOOGLE_APPLICATION_CREDENTIALS env variable will be present in the execution runtime.
* In case of needing authenticate differently, the credentials builder (part of the
* BigQueryConnectOptions) should enable capturing the credentials from various sources.
*/
BigQuerySource<GenericRecord> bqSource =
BigQuerySource.readAvrosFromQuery(query, projectName, limit);

env.fromSource(bqSource, WatermarkStrategy.noWatermarks(), "BigQueryQuerySource")
.flatMap(new FlatMapper(recordPropertyToAggregate))
.keyBy(mappedTuple -> mappedTuple.f0)
.sum("f1")
.print();

env.execute("Flink BigQuery query example");
}

private static void runFlinkJob(
Expand Down Expand Up @@ -130,10 +181,10 @@ private static void runFlinkJob(
env.fromSource(bqSource, WatermarkStrategy.noWatermarks(), "BigQuerySource")
.flatMap(new FlatMapper(recordPropertyToAggregate))
.keyBy(mappedTuple -> mappedTuple.f0)
.max("f1")
.sum("f1")
.print();

env.execute("Flink BigQuery Example");
env.execute("Flink BigQuery example");
}

static class FlatMapper implements FlatMapFunction<GenericRecord, Tuple2<String, Integer>> {
Expand All @@ -147,9 +198,7 @@ public FlatMapper(String recordPropertyToAggregate) {
@Override
public void flatMap(GenericRecord record, Collector<Tuple2<String, Integer>> out)
throws Exception {
out.collect(
Tuple2.<String, Integer>of(
(String) record.get(recordPropertyToAggregate).toString(), 1));
out.collect(Tuple2.of((String) record.get(recordPropertyToAggregate).toString(), 1));
}
}
}
Loading