Skip to content

Commit

Permalink
Implements the Table API for BigQuery (#46)
Browse files Browse the repository at this point in the history
By using the BigQuerySource we provide the Table API implementation of a
higher degree of customization, supporting projection, restriction and
partition pushdown functionalities.

---------

Co-authored-by: Jayant Jain <jainjayant@google.com>
  • Loading branch information
prodriguezdefino and jayehwhyehentee authored Nov 23, 2023
1 parent 164f8fe commit d5737b6
Show file tree
Hide file tree
Showing 19 changed files with 1,567 additions and 94 deletions.
2 changes: 1 addition & 1 deletion flink-connector-bigquery/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ under the License.
<goal>test</goal>
</goals>
<configuration>
<argLine>${argLine} -XX:+UseG1GC -Xms256m -Xmx2048m</argLine>
<argLine>-XX:+UseG1GC -Xms256m -Xmx2048m</argLine>
</configuration>
</execution>
</executions>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.flink.util.function.SerializableSupplier;

import com.google.auto.value.AutoValue;
import com.google.auto.value.extension.serializable.SerializableAutoValue;
import com.google.cloud.flink.bigquery.services.BigQueryServices;

import javax.annotation.Nullable;
Expand All @@ -30,6 +31,7 @@

/** BigQuery client connection configuration. */
@AutoValue
@SerializableAutoValue
@PublicEvolving
public abstract class BigQueryConnectOptions implements Serializable {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,20 @@

package com.google.cloud.flink.bigquery.common.utils;

import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableCollection;
import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMultimap;
import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.FieldList;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
Expand All @@ -42,36 +42,40 @@
*/
public class SchemaTransform {

static final String NAMESPACE = "com.google.cloud.flink.bigquery";
public static final String DEFAULT_NAMESPACE = "com.google.cloud.flink.bigquery";
/**
* Defines the valid mapping between BigQuery types and native Avro types.
*
* <p>Some BigQuery types are duplicated here since slightly different Avro records are produced
* when exporting data in Avro format and when reading data directly using the read API.
*/
static final ImmutableMultimap<String, Schema.Type> BIG_QUERY_TO_AVRO_TYPES =
ImmutableMultimap.<String, Schema.Type>builder()
.put("STRING", Schema.Type.STRING)
.put("GEOGRAPHY", Schema.Type.STRING)
.put("BYTES", Schema.Type.BYTES)
.put("INTEGER", Schema.Type.LONG)
.put("INT64", Schema.Type.LONG)
.put("FLOAT", Schema.Type.DOUBLE)
.put("FLOAT64", Schema.Type.DOUBLE)
.put("NUMERIC", Schema.Type.BYTES)
.put("BIGNUMERIC", Schema.Type.BYTES)
.put("BOOLEAN", Schema.Type.BOOLEAN)
.put("BOOL", Schema.Type.BOOLEAN)
.put("TIMESTAMP", Schema.Type.LONG)
.put("RECORD", Schema.Type.RECORD)
.put("STRUCT", Schema.Type.RECORD)
.put("DATE", Schema.Type.STRING)
.put("DATE", Schema.Type.INT)
.put("DATETIME", Schema.Type.STRING)
.put("TIME", Schema.Type.STRING)
.put("TIME", Schema.Type.LONG)
.put("JSON", Schema.Type.STRING)
.build();
static final Map<String, List<Schema.Type>> BIG_QUERY_TO_AVRO_TYPES =
initializeBigQueryToAvroTypesMapping();

private static Map<String, List<Schema.Type>> initializeBigQueryToAvroTypesMapping() {
Map<String, List<Schema.Type>> mapping = new HashMap<>();

mapping.put("STRING", Arrays.asList(Schema.Type.STRING));
mapping.put("GEOGRAPHY", Arrays.asList(Schema.Type.STRING));
mapping.put("BYTES", Arrays.asList(Schema.Type.BYTES));
mapping.put("INTEGER", Arrays.asList(Schema.Type.LONG));
mapping.put("INT64", Arrays.asList(Schema.Type.LONG));
mapping.put("FLOAT", Arrays.asList(Schema.Type.DOUBLE));
mapping.put("FLOAT64", Arrays.asList(Schema.Type.DOUBLE));
mapping.put("NUMERIC", Arrays.asList(Schema.Type.BYTES));
mapping.put("BIGNUMERIC", Arrays.asList(Schema.Type.BYTES));
mapping.put("BOOLEAN", Arrays.asList(Schema.Type.BOOLEAN));
mapping.put("BOOL", Arrays.asList(Schema.Type.BOOLEAN));
mapping.put("TIMESTAMP", Arrays.asList(Schema.Type.LONG));
mapping.put("RECORD", Arrays.asList(Schema.Type.RECORD));
mapping.put("STRUCT", Arrays.asList(Schema.Type.RECORD));
mapping.put("DATE", Arrays.asList(Schema.Type.STRING, Schema.Type.INT));
mapping.put("DATETIME", Arrays.asList(Schema.Type.STRING));
mapping.put("TIME", Arrays.asList(Schema.Type.STRING, Schema.Type.LONG));
mapping.put("JSON", Arrays.asList(Schema.Type.STRING));

return mapping;
}

public static Schema toGenericAvroSchema(
String schemaName, List<TableFieldSchema> fieldSchemas, String namespace) {
Expand All @@ -86,15 +90,17 @@ public static Schema toGenericAvroSchema(
return Schema.createRecord(
schemaName,
"Translated Avro Schema for " + schemaName,
namespace == null ? NAMESPACE : namespace,
namespace == null ? DEFAULT_NAMESPACE : namespace,
false,
avroFields);
}

public static Schema toGenericAvroSchema(
String schemaName, List<TableFieldSchema> fieldSchemas) {
return toGenericAvroSchema(
schemaName, fieldSchemas, hasNamespaceCollision(fieldSchemas) ? NAMESPACE : null);
schemaName,
fieldSchemas,
hasNamespaceCollision(fieldSchemas) ? DEFAULT_NAMESPACE : null);
}

// To maintain backwards compatibility we only disambiguate collisions in the field namespaces
Expand Down Expand Up @@ -122,8 +128,7 @@ private static boolean hasNamespaceCollision(List<TableFieldSchema> fieldSchemas
"nullness" // Avro library not annotated
})
private static Schema.Field convertField(TableFieldSchema bigQueryField, String namespace) {
ImmutableCollection<Schema.Type> avroTypes =
BIG_QUERY_TO_AVRO_TYPES.get(bigQueryField.getType());
List<Schema.Type> avroTypes = BIG_QUERY_TO_AVRO_TYPES.get(bigQueryField.getType());
if (avroTypes.isEmpty()) {
throw new IllegalArgumentException(
"Unable to map BigQuery field type "
Expand Down Expand Up @@ -195,29 +200,21 @@ static List<TableFieldSchema> fieldListToListOfTableFieldSchema(FieldList fieldL
.map(
fList ->
fList.stream()
.map(
field ->
new TableFieldSchema()
.setName(field.getName())
.setDescription(
field.getDescription())
.setDefaultValueExpression(
field
.getDefaultValueExpression())
.setCollation(field.getCollation())
.setMode(
Optional.ofNullable(
field
.getMode())
.map(m -> m.name())
.orElse(null))
.setType(field.getType().name())
.setFields(
fieldListToListOfTableFieldSchema(
field
.getSubFields())))
.map(field -> fieldToTableFieldSchema(field))
.collect(Collectors.toList()))
.orElse(Lists.newArrayList());
.orElse(new ArrayList<>());
}

static TableFieldSchema fieldToTableFieldSchema(Field field) {

return new TableFieldSchema()
.setName(field.getName())
.setDescription(field.getDescription())
.setDefaultValueExpression(field.getDefaultValueExpression())
.setCollation(field.getCollation())
.setMode(Optional.ofNullable(field.getMode()).map(m -> m.name()).orElse(null))
.setType(field.getType().name())
.setFields(fieldListToListOfTableFieldSchema(field.getSubFields()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;

import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.rpc.FixedHeaderProvider;
import com.google.api.gax.rpc.HeaderProvider;
Expand Down Expand Up @@ -53,6 +51,7 @@
import org.threeten.bp.Duration;

import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -179,7 +178,7 @@ public QueryDataClientImpl(CredentialsOptions options) {
public List<String> retrieveTablePartitions(String project, String dataset, String table) {
try {
String query =
Lists.newArrayList(
Arrays.asList(
"SELECT",
" partition_id",
"FROM",
Expand Down Expand Up @@ -217,7 +216,7 @@ public Optional<Tuple2<String, StandardSQLTypeName>> retrievePartitionColumnName
String project, String dataset, String table) {
try {
String query =
Lists.newArrayList(
Arrays.asList(
"SELECT",
" column_name, data_type",
"FROM",
Expand All @@ -234,6 +233,8 @@ public Optional<Tuple2<String, StandardSQLTypeName>> retrievePartitionColumnName

QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(query).build();

// TODO: change this method to use getTable method, see comment:
// https://github.com/GoogleCloudDataproc/flink-bigquery-connector/pull/46#discussion_r1371229725
TableResult results = bigQuery.query(queryConfig);

return StreamSupport.stream(results.iterateAll().spliterator(), false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,12 @@ public Optional<String> getDestinationTable() {

@Override
public int hashCode() {
int hash = 7;
hash = 97 * hash + Objects.hashCode(this.status);
hash = 97 * hash + Objects.hashCode(this.errorMessages);
hash = 97 * hash + Objects.hashCode(this.destinationProject);
hash = 97 * hash + Objects.hashCode(this.destinationDataset);
hash = 97 * hash + Objects.hashCode(this.destinationTable);
return hash;
return Objects.hash(
getStatus(),
getErrorMessages(),
getDestinationProject(),
getDestinationDataset(),
getDestinationTable());
}

@Override
Expand All @@ -122,19 +121,11 @@ public boolean equals(Object obj) {
return false;
}
final QueryResultInfo other = (QueryResultInfo) obj;
if (!Objects.equals(this.destinationProject, other.destinationProject)) {
return false;
}
if (!Objects.equals(this.destinationDataset, other.destinationDataset)) {
return false;
}
if (!Objects.equals(this.destinationTable, other.destinationTable)) {
return false;
}
if (this.status != other.status) {
return false;
}
return Objects.equals(this.errorMessages, other.errorMessages);
return Objects.equals(this.destinationProject, other.destinationProject)
&& Objects.equals(this.destinationDataset, other.destinationDataset)
&& Objects.equals(this.destinationTable, other.destinationTable)
&& Objects.equals(this.status, other.status)
&& Objects.equals(this.errorMessages, other.errorMessages);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.google.cloud.flink.bigquery.source;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
Expand Down Expand Up @@ -204,6 +205,27 @@ public static BigQuerySource<GenericRecord> readAvrosFromQuery(
.setQueryAndExecutionProject(query, gcpProject)
.build();

return readAvrosFromQuery(readOptions, query, gcpProject, limit);
}

/**
* Creates an instance of the source, setting Avro {@link GenericRecord} as the return type for
* the data (mimicking the table's schema), limiting the record retrieval to the provided limit
* and reading data from the provided query which will be executed using the provided GCP
* project.
*
* @param readOptions The BigQuery read options to execute
* @param query A BigQuery standard SQL query.
* @param gcpProject The GCP project where the provided query will execute.
* @param limit the max quantity of records to be returned.
* @return A fully initialized instance of the source, ready to read {@link GenericRecord} from
* the BigQuery query results.
* @throws IOException
*/
@VisibleForTesting
static BigQuerySource<GenericRecord> readAvrosFromQuery(
BigQueryReadOptions readOptions, String query, String gcpProject, Integer limit)
throws IOException {
BigQueryConnectOptions connectOptions = readOptions.getBigQueryConnectOptions();
TableSchema tableSchema =
BigQueryServicesFactory.instance(connectOptions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;

import com.google.auto.value.AutoValue;
import com.google.auto.value.extension.serializable.SerializableAutoValue;
import com.google.cloud.flink.bigquery.common.config.BigQueryConnectOptions;
import com.google.cloud.flink.bigquery.common.config.CredentialsOptions;
import org.threeten.bp.Instant;
Expand All @@ -37,6 +38,7 @@

/** The options available to read data from BigQuery. */
@AutoValue
@SerializableAutoValue
@PublicEvolving
public abstract class BigQueryReadOptions implements Serializable {

Expand Down
Loading

0 comments on commit d5737b6

Please sign in to comment.