Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Open Clover for code coverage analysis #48

Merged
Show file tree
Hide file tree
Changes from 64 commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
eb9dc8f
cleaning up the main branch for new version.
prodriguezdefino Jun 27, 2023
0695673
updates in ignore and readme files
prodriguezdefino Jun 27, 2023
e36cc44
prepping the pom addition, added parent's compliance tools
prodriguezdefino Jun 27, 2023
e167803
adding parent pom and the connector impl project pom
prodriguezdefino Jun 28, 2023
8043378
adding common functionalities
prodriguezdefino Jun 28, 2023
b455b4e
added the bigquery services wrapper and factories
prodriguezdefino Jun 28, 2023
27cd837
creates the split, its state and the enumerator state
prodriguezdefino Jun 28, 2023
4d4b60f
added configs, split reader and split assignment impls
prodriguezdefino Jun 28, 2023
0567b58
applying recommendations from sonartype-lift
prodriguezdefino Jun 28, 2023
9006714
adding the Datastream source implementation for BigQuery
prodriguezdefino Jun 28, 2023
d5d95bf
added Table API implementation for BigQuery
prodriguezdefino Jun 29, 2023
1263768
adding the example and shaded distro jar, fixes a NPE when the provid…
prodriguezdefino Jun 29, 2023
5743292
merge changes from master (previous pom deletion resolution)
prodriguezdefino Jul 6, 2023
cab9115
fixing the package name for the schema namespace
prodriguezdefino Jul 6, 2023
3375d54
Merge branch 'common_code_source' into bq_services_wrappers
prodriguezdefino Jul 10, 2023
849f769
merged main branch and took care of few lift comments
prodriguezdefino Jul 11, 2023
fd70b95
Merge branch 'bq_services_wrappers' into source_splits
prodriguezdefino Jul 11, 2023
0f18d14
merge from source_split
prodriguezdefino Jul 11, 2023
6b08119
fixing lift recommendations and spotless
prodriguezdefino Jul 11, 2023
5973d26
Merge branch 'split_assigner_and_reader' into source_functionality
prodriguezdefino Jul 11, 2023
2d9635d
Merge branch 'source_functionality' into table_api
prodriguezdefino Jul 11, 2023
135beeb
Merge branch 'table_api' into add_example_and_shadedsqljar
prodriguezdefino Jul 11, 2023
3ffb582
fixes namespace error in test and spotless
prodriguezdefino Jul 11, 2023
cc45b2f
Merge branch 'table_api' into add_example_and_shadedsqljar
prodriguezdefino Jul 11, 2023
f473d57
addressing comments from review
prodriguezdefino Jul 27, 2023
c178f83
merge from main
prodriguezdefino Aug 1, 2023
09eaaa4
merge from master
prodriguezdefino Aug 1, 2023
def3cc4
Merge branch 'source_splits' into split_assigner_and_reader
prodriguezdefino Aug 1, 2023
ceabb12
fixed type reference Int -> Long
prodriguezdefino Aug 1, 2023
0dc8875
Merge branch 'split_assigner_and_reader' into source_functionality
prodriguezdefino Aug 3, 2023
963c80b
Merge branch 'source_functionality' into table_api
prodriguezdefino Aug 3, 2023
5206ef1
Merge branch 'table_api' into add_example_and_shadedsqljar
prodriguezdefino Aug 3, 2023
1734bac
merge from main
prodriguezdefino Aug 8, 2023
e96ff59
addressing comments from review
prodriguezdefino Aug 8, 2023
3b78492
Merge branch 'split_assigner_and_reader' into source_functionality
prodriguezdefino Aug 8, 2023
c492f02
improved hashcode and equals readability
prodriguezdefino Aug 8, 2023
6a05498
Merge branch 'split_assigner_and_reader' into source_functionality
prodriguezdefino Aug 8, 2023
820fb3b
Merge branch 'source_functionality' into table_api
prodriguezdefino Aug 8, 2023
dd5be94
improve readibility for hashcode and equals
prodriguezdefino Aug 8, 2023
fbd07c6
Merge branch 'table_api' into add_example_and_shadedsqljar
prodriguezdefino Aug 8, 2023
9aae0af
changed tests to use google-truth instead of junit or assertj asserti…
prodriguezdefino Aug 9, 2023
61e5644
Merge branch 'split_assigner_and_reader' into source_functionality
prodriguezdefino Aug 9, 2023
517de82
added google-truth to tests
prodriguezdefino Aug 9, 2023
d916a1c
Merge branch 'source_functionality' into table_api
prodriguezdefino Aug 9, 2023
11fce0d
added google-truth to tests
prodriguezdefino Aug 9, 2023
099078e
Merge branch 'table_api' into add_example_and_shadedsqljar
prodriguezdefino Aug 10, 2023
9da7706
merge from master after #44
prodriguezdefino Oct 31, 2023
a10470c
Merge branch 'source_functionality' into table_api
prodriguezdefino Oct 31, 2023
6b28a0c
removing guava dependency from file
prodriguezdefino Nov 1, 2023
7160ff9
merge from master
prodriguezdefino Nov 1, 2023
87780c5
adding serializable autovalue annotation to avoid storing an Optional…
prodriguezdefino Nov 2, 2023
3f4d1be
addressing comments from review
prodriguezdefino Nov 2, 2023
c8eb789
addressed comments from review
prodriguezdefino Nov 2, 2023
fd79610
Merge branch 'table_api' into add_example_and_shadedsqljar
prodriguezdefino Nov 2, 2023
cce6567
partition list in 1024 chunks
prodriguezdefino Nov 3, 2023
f36f91f
Merge branch 'table_api' into add_example_and_shadedsqljar
prodriguezdefino Nov 3, 2023
5260073
Address review comments
jayehwhyehentee Nov 23, 2023
936076b
adding the example and shaded distro jar, fixes a NPE when the provid…
prodriguezdefino Jun 29, 2023
7977576
Merge branch 'add_example_and_shadedsqljar' of https://github.com/pro…
jayehwhyehentee Nov 24, 2023
1460af6
added covertura analysis through open clover and improved few tests
prodriguezdefino Jun 30, 2023
a8499c3
removed repeated mvn reference from command
prodriguezdefino Jul 11, 2023
229f33d
Merge branch 'main' into use_openclover_coverage
jayehwhyehentee Nov 24, 2023
68d0bf9
Merge branch 'main' into add_example_and_shadedsqljar
jayehwhyehentee Nov 24, 2023
608e3c7
Merge branch 'add_example_and_shadedsqljar' into use_openclover_coverage
jayehwhyehentee Nov 24, 2023
c9f73ee
Address review comments
jayehwhyehentee Nov 27, 2023
8c9cc26
Merge branch 'add_example_and_shadedsqljar' into use_openclover_coverage
jayehwhyehentee Nov 27, 2023
27c34d0
Make checkpoint interval configurable in example jar
jayehwhyehentee Nov 27, 2023
fa4f5a4
Merge branch 'add_example_and_shadedsqljar' into use_openclover_coverage
jayehwhyehentee Nov 27, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Prerequisites:

```
git clone https://github.com/GoogleCloudDataproc/flink-bigquery-connector
cd flink-connector-bigquery
cd flink-bigquery-connector
mvn clean package -DskipTests
```

Expand Down
16 changes: 2 additions & 14 deletions cloudbuild/cloudbuild.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,15 @@ steps:
env:
- 'CODECOV_TOKEN=${_CODECOV_TOKEN}'

# 3. Run unit tests
# 3. Run unit & integration tests
- name: 'gcr.io/$PROJECT_ID/dataproc-flink-bigquery-connector-presubmit'
id: 'unit-tests'
waitFor: ['init']
entrypoint: 'bash'
args: ['/workspace/cloudbuild/presubmit.sh', 'unittest']
args: ['/workspace/cloudbuild/presubmit.sh', 'tests']
env:
- 'CODECOV_TOKEN=${_CODECOV_TOKEN}'

# 4. Run integration tests concurrently with unit tests
# Commeneted out until integration tests are ported
# - name: 'gcr.io/$PROJECT_ID/dataproc-flink-bigquery-connector-presubmit'
# id: 'integration-tests'
# waitFor: ['unit-tests']
# entrypoint: 'bash'
# args: ['/workspace/cloudbuild/presubmit.sh', 'integrationtest']
# env:
# - 'GOOGLE_CLOUD_PROJECT=${_GOOGLE_CLOUD_PROJECT}'
# - 'TEMPORARY_GCS_BUCKET=${_TEMPORARY_GCS_BUCKET}'
# - 'CODECOV_TOKEN=${_CODECOV_TOKEN}'

# Tests take around 20 mins in general.
timeout: 1800s

Expand Down
13 changes: 4 additions & 9 deletions cloudbuild/presubmit.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,13 @@ cd /workspace
case $STEP in
# Download maven and all the dependencies
init)
$MVN install -DskipTests
$MVN clean install -DskipTests
exit
;;

# Run unit tests
unittest)
$MVN test jacoco:report jacoco:report-aggregate
;;

# Run integration tests
integrationtest)
$MVN failsafe:integration-test failsafe:verify jacoco:report jacoco:report-aggregate
# Run unit & integration tests
tests)
$MVN clean clover:setup verify clover:aggregate clover:clover -Pclover -pl flink-connector-bigquery
;;

*)
Expand Down
131 changes: 131 additions & 0 deletions flink-connector-bigquery-examples/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-bigquery-parent</artifactId>
<version>1.1-SNAPSHOT</version>
</parent>

<artifactId>flink-connector-bigquery-examples</artifactId>
<name>Flink : Connectors : Google BigQuery Examples</name>
<packaging>jar</packaging>

<properties>
<japicmp.skip>true</japicmp.skip>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-bigquery</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>

<build>
<finalName>BigQueryExample</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<id>shade-flink</id>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
<createDependencyReducedPom>false</createDependencyReducedPom>
<resources>
<resource>
<directory>src/test/resources</directory>
</resource>
</resources>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.apache.flink.examples.gcp.bigquery.BigQueryExample</mainClass>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
<resource>log4j2-bqexample.properties</resource>
<file>src/main/resources/log4j2-bqexample.properties</file>
</transformer>
</transformers>
<artifactSet>
<includes>
<include>org.apache.flink:flink-connector-bigquery</include>
<include>org.apache.flink:flink-avro</include>
<include>org.apache.flink:flink-metrics-dropwizard</include>
<include>com.google.cloud:google-cloud-bigquerystorage</include>
<include>com.google.*:*</include>
<include>commons-codec:commons-codec</include>
<include>dev.failsafe:*</include>
<include>org.apache.avro:*</include>
<include>org.apache.httpcomponents:*</include>
<include>org.codehaus.mojo:animal-sniffer-annotations</include>
<include>org.conscrypt:*</include>
<include>com.fasterxml.jackson.*:*</include>
<include>org.threeten:*</include>
<include>org.checkerframework:*</include>
<include>io.dropwizard.metrics:*</include>
<include>io.grpc:*</include>
<include>io.opencensus:*</include>
<include>io.perfmark:*</include>
</includes>
</artifactSet>
<relocations>
<relocation>
<pattern>com.google</pattern>
<shadedPattern>org.apache.flink.examples.gcp.bigquery.shaded.com.google</shadedPattern>
</relocation>
</relocations>

<filters>
<filter>
<artifact>org.apache.flink:flink-connector-bigquery-examples*</artifact>
<includes>
<include>org/apache/flink/examples/gcp/bigquery/**</include>
</includes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.examples.gcp.bigquery;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import com.google.cloud.flink.bigquery.common.config.BigQueryConnectOptions;
import com.google.cloud.flink.bigquery.source.BigQuerySource;
import com.google.cloud.flink.bigquery.source.config.BigQueryReadOptions;
import org.apache.avro.generic.GenericRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A simple BigQuery table read example with Flink's DataStream API.
*
* <p>The Flink pipeline will try to read the specified BigQuery table, potentially limiting the
* element count to the specified row restriction and limit count, returning {@link GenericRecord}
* representing the rows, to finally prints out some aggregated values given the provided payload's
* field.
*
* <p>Note on row restriction: In case of including a restriction with a temporal reference,
* something like {@code "TIMESTAMP_TRUNC(ingestion_timestamp, HOUR) = '2023-06-20 19:00:00'"}, and
* launching the job from Flink's Rest API is known the single quotes are not supported and will
* make the pipeline fail. As a workaround for that case using \u0027 as a replacement will make it
* work, example {@code "TIMESTAMP_TRUNC(ingestion_timestamp, HOUR) = \u00272023-06-20
* 19:00:00\u0027"}.
prodriguezdefino marked this conversation as resolved.
Show resolved Hide resolved
*/
public class BigQueryExample {

private static final Logger LOG = LoggerFactory.getLogger(BigQueryExample.class);

public static void main(String[] args) throws Exception {
// parse input arguments
final ParameterTool parameterTool = ParameterTool.fromArgs(args);

if (parameterTool.getNumberOfParameters() < 4) {
LOG.error(
"Missing parameters!\n"
+ "Usage: flink run <additional runtime params> BigQuery.jar"
+ " --gcp-project <gcp-project> --bq-dataset <dataset name>"
+ " --bq-table <table name> --agg-prop <payload's property>"
+ " --restriction <single-quoted string with row predicate>"
+ " --limit <optional: limit records returned>");
return;
}

String projectName = parameterTool.getRequired("gcp-project");
String datasetName = parameterTool.getRequired("bq-dataset");
String tableName = parameterTool.getRequired("bq-table");
String rowRestriction = parameterTool.get("restriction", "").replace("\\u0027", "'");
Integer recordLimit = parameterTool.getInt("limit", -1);
String recordPropertyToAggregate = parameterTool.getRequired("agg-prop");

runFlinkJob(
projectName,
datasetName,
tableName,
recordPropertyToAggregate,
rowRestriction,
recordLimit);
}

private static void runFlinkJob(
String projectName,
String datasetName,
String tableName,
String recordPropertyToAggregate,
String rowRestriction,
Integer limit)
throws Exception {

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000L);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we get this as an user provided configuration

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed the same concern in previous PR (#47).


/**
* we will be reading avro generic records from BigQuery, and in this case we are assuming
* the GOOGLE_APPLICATION_CREDENTIALS env variable will be present in the execution runtime.
* In case of needing authenticate differently, the credentials builder (part of the
* BigQueryConnectOptions) should enable capturing the credentials from various sources.
*/
BigQuerySource<GenericRecord> bqSource =
BigQuerySource.readAvros(
BigQueryReadOptions.builder()
.setBigQueryConnectOptions(
BigQueryConnectOptions.builder()
.setProjectId(projectName)
.setDataset(datasetName)
.setTable(tableName)
.build())
.setRowRestriction(rowRestriction)
.build(),
limit);

env.fromSource(bqSource, WatermarkStrategy.noWatermarks(), "BigQuerySource")
.flatMap(new FlatMapper(recordPropertyToAggregate))
.keyBy(t -> t.f0)
.max("f1")
.print();

env.execute("Flink BigQuery Example");
}

static class FlatMapper implements FlatMapFunction<GenericRecord, Tuple2<String, Integer>> {

private final String recordPropertyToAggregate;

public FlatMapper(String recordPropertyToAggregate) {
this.recordPropertyToAggregate = recordPropertyToAggregate;
}

@Override
public void flatMap(GenericRecord record, Collector<Tuple2<String, Integer>> out)
throws Exception {
out.collect(
Tuple2.<String, Integer>of(
(String) record.get(recordPropertyToAggregate).toString(), 1));
}
}
}
48 changes: 0 additions & 48 deletions flink-connector-bigquery/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -162,58 +162,10 @@ under the License.
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<executions>
<execution>
<id>default-test</id>
<phase>test</phase>
<goals>
<goal>test</goal>
</goals>
<configuration>
<argLine>${argLine} -XX:+UseG1GC -Xms256m -Xmx1024m</argLine>
</configuration>
</execution>
<execution>
<id>integration-tests</id>
<phase>integration-test</phase>
<goals>
<goal>test</goal>
</goals>
<configuration>
<argLine>-XX:+UseG1GC -Xms256m -Xmx2048m</argLine>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
<executions>
<execution>
<id>prepare-agent</id>
<goals>
<goal>prepare-agent</goal>
</goals>
</execution>
<execution>
<id>report</id>
<phase>install</phase>
<goals>
<goal>report</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
Expand Down
Loading
Loading