Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
prodriguezdefino committed Nov 2, 2023
2 parents 8dbf958 + 8008260 commit c923f31
Show file tree
Hide file tree
Showing 12 changed files with 315 additions and 262 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.flink.util.function.SerializableSupplier;

import com.google.auto.value.AutoValue;
import com.google.auto.value.extension.serializable.SerializableAutoValue;
import com.google.cloud.flink.bigquery.services.BigQueryServices;

import javax.annotation.Nullable;
Expand All @@ -30,6 +31,7 @@

/** BigQuery client connection configuration. */
@AutoValue
@SerializableAutoValue
@PublicEvolving
public abstract class BigQueryConnectOptions implements Serializable {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (C) 2023 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package com.google.cloud.flink.bigquery.common.exceptions;

/** Represents a general error during the execution of the connector's code. */
public class BigQueryConnectorException extends RuntimeException {

public BigQueryConnectorException(String message) {
super(message);
}

public BigQueryConnectorException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,20 @@

package com.google.cloud.flink.bigquery.common.utils;

import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableCollection;
import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMultimap;
import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.FieldList;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
Expand All @@ -49,29 +49,33 @@ public class SchemaTransform {
* <p>Some BigQuery types are duplicated here since slightly different Avro records are produced
* when exporting data in Avro format and when reading data directly using the read API.
*/
static final ImmutableMultimap<String, Schema.Type> BIG_QUERY_TO_AVRO_TYPES =
ImmutableMultimap.<String, Schema.Type>builder()
.put("STRING", Schema.Type.STRING)
.put("GEOGRAPHY", Schema.Type.STRING)
.put("BYTES", Schema.Type.BYTES)
.put("INTEGER", Schema.Type.LONG)
.put("INT64", Schema.Type.LONG)
.put("FLOAT", Schema.Type.DOUBLE)
.put("FLOAT64", Schema.Type.DOUBLE)
.put("NUMERIC", Schema.Type.BYTES)
.put("BIGNUMERIC", Schema.Type.BYTES)
.put("BOOLEAN", Schema.Type.BOOLEAN)
.put("BOOL", Schema.Type.BOOLEAN)
.put("TIMESTAMP", Schema.Type.LONG)
.put("RECORD", Schema.Type.RECORD)
.put("STRUCT", Schema.Type.RECORD)
.put("DATE", Schema.Type.STRING)
.put("DATE", Schema.Type.INT)
.put("DATETIME", Schema.Type.STRING)
.put("TIME", Schema.Type.STRING)
.put("TIME", Schema.Type.LONG)
.put("JSON", Schema.Type.STRING)
.build();
static final Map<String, List<Schema.Type>> BIG_QUERY_TO_AVRO_TYPES =
initializeBigQueryToAvroTypesMapping();

private static Map<String, List<Schema.Type>> initializeBigQueryToAvroTypesMapping() {
Map<String, List<Schema.Type>> mapping = new HashMap<>();

mapping.put("STRING", Arrays.asList(Schema.Type.STRING));
mapping.put("GEOGRAPHY", Arrays.asList(Schema.Type.STRING));
mapping.put("BYTES", Arrays.asList(Schema.Type.BYTES));
mapping.put("INTEGER", Arrays.asList(Schema.Type.LONG));
mapping.put("INT64", Arrays.asList(Schema.Type.LONG));
mapping.put("FLOAT", Arrays.asList(Schema.Type.DOUBLE));
mapping.put("FLOAT64", Arrays.asList(Schema.Type.DOUBLE));
mapping.put("NUMERIC", Arrays.asList(Schema.Type.BYTES));
mapping.put("BIGNUMERIC", Arrays.asList(Schema.Type.BYTES));
mapping.put("BOOLEAN", Arrays.asList(Schema.Type.BOOLEAN));
mapping.put("BOOL", Arrays.asList(Schema.Type.BOOLEAN));
mapping.put("TIMESTAMP", Arrays.asList(Schema.Type.LONG));
mapping.put("RECORD", Arrays.asList(Schema.Type.RECORD));
mapping.put("STRUCT", Arrays.asList(Schema.Type.RECORD));
mapping.put("DATE", Arrays.asList(Schema.Type.STRING, Schema.Type.INT));
mapping.put("DATETIME", Arrays.asList(Schema.Type.STRING));
mapping.put("TIME", Arrays.asList(Schema.Type.STRING, Schema.Type.LONG));
mapping.put("JSON", Arrays.asList(Schema.Type.STRING));

return mapping;
}

public static Schema toGenericAvroSchema(
String schemaName, List<TableFieldSchema> fieldSchemas, String namespace) {
Expand Down Expand Up @@ -124,8 +128,7 @@ private static boolean hasNamespaceCollision(List<TableFieldSchema> fieldSchemas
"nullness" // Avro library not annotated
})
private static Schema.Field convertField(TableFieldSchema bigQueryField, String namespace) {
ImmutableCollection<Schema.Type> avroTypes =
BIG_QUERY_TO_AVRO_TYPES.get(bigQueryField.getType());
List<Schema.Type> avroTypes = BIG_QUERY_TO_AVRO_TYPES.get(bigQueryField.getType());
if (avroTypes.isEmpty()) {
throw new IllegalArgumentException(
"Unable to map BigQuery field type "
Expand Down Expand Up @@ -197,29 +200,21 @@ static List<TableFieldSchema> fieldListToListOfTableFieldSchema(FieldList fieldL
.map(
fList ->
fList.stream()
.map(
field ->
new TableFieldSchema()
.setName(field.getName())
.setDescription(
field.getDescription())
.setDefaultValueExpression(
field
.getDefaultValueExpression())
.setCollation(field.getCollation())
.setMode(
Optional.ofNullable(
field
.getMode())
.map(m -> m.name())
.orElse(null))
.setType(field.getType().name())
.setFields(
fieldListToListOfTableFieldSchema(
field
.getSubFields())))
.map(field -> fieldToTableFieldSchema(field))
.collect(Collectors.toList()))
.orElse(Lists.newArrayList());
.orElse(new ArrayList<>());
}

static TableFieldSchema fieldToTableFieldSchema(Field field) {

return new TableFieldSchema()
.setName(field.getName())
.setDescription(field.getDescription())
.setDefaultValueExpression(field.getDefaultValueExpression())
.setCollation(field.getCollation())
.setMode(Optional.ofNullable(field.getMode()).map(m -> m.name()).orElse(null))
.setType(field.getType().name())
.setFields(fieldListToListOfTableFieldSchema(field.getSubFields()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
import org.apache.flink.FlinkVersion;
import org.apache.flink.annotation.Internal;

import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;

import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.rpc.FixedHeaderProvider;
import com.google.api.gax.rpc.HeaderProvider;
Expand Down Expand Up @@ -54,6 +52,7 @@
import org.threeten.bp.Duration;

import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -180,7 +179,7 @@ public QueryDataClientImpl(CredentialsOptions options) {
public List<String> retrieveTablePartitions(String project, String dataset, String table) {
try {
String query =
Lists.newArrayList(
Arrays.asList(
"SELECT",
" partition_id",
"FROM",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ static BigQuerySource<GenericRecord> readAvrosFromQuery(
TableSchema tableSchema =
BigQueryServicesFactory.instance(connectOptions)
.queryClient()
.dryRunQuery(readOptions.getQueryExecutionProject(), readOptions.getQuery())
.dryRunQuery(gcpProject, query)
.getStatistics()
.getQuery()
.getSchema();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;

import com.google.auto.value.AutoValue;
import com.google.auto.value.extension.serializable.SerializableAutoValue;
import com.google.cloud.flink.bigquery.common.config.BigQueryConnectOptions;
import com.google.cloud.flink.bigquery.common.config.CredentialsOptions;
import org.threeten.bp.Instant;
Expand All @@ -37,21 +38,19 @@

/** The options available to read data from BigQuery. */
@AutoValue
@SerializableAutoValue
@PublicEvolving
public abstract class BigQueryReadOptions implements Serializable {

public abstract ImmutableList<String> getColumnNames();

public abstract String getRowRestriction();

@Nullable
public abstract Long getSnapshotTimestampInMillis();
public abstract Optional<Long> getSnapshotTimestampInMillis();

@Nullable
public abstract String getQuery();
public abstract Optional<String> getQuery();

@Nullable
public abstract String getQueryExecutionProject();
public abstract Optional<String> getQueryExecutionProject();

public abstract Integer getMaxStreamCount();

Expand Down Expand Up @@ -159,7 +158,7 @@ public Builder setQueryWithExecutionProjectAndCredentialsOptions(
* @param query A BigQuery standard SQL query.
* @return This {@link Builder} instance.
*/
public abstract Builder setQuery(String query);
public abstract Builder setQuery(@Nullable String query);

/**
* Sets the GCP project where the configured query will be run. In case the query
Expand All @@ -168,7 +167,7 @@ public Builder setQueryWithExecutionProjectAndCredentialsOptions(
* @param projectId A GCP project.
* @return This {@link Builder} instance.
*/
public abstract Builder setQueryExecutionProject(String projectId);
public abstract Builder setQueryExecutionProject(@Nullable String projectId);

/**
* Sets the restriction the rows in the BigQuery table must comply to be returned by the
Expand All @@ -194,7 +193,7 @@ public Builder setQueryWithExecutionProjectAndCredentialsOptions(
* @param snapshotTs The snapshot's time in milliseconds since epoch.
* @return This {@link Builder} instance.
*/
public abstract Builder setSnapshotTimestampInMillis(Long snapshotTs);
public abstract Builder setSnapshotTimestampInMillis(@Nullable Long snapshotTs);

/**
* Sets the maximum number of read streams that BigQuery should create to retrieve data from
Expand Down Expand Up @@ -236,19 +235,21 @@ public final BigQueryReadOptions build() {
readOptions.getMaxStreamCount() >= 0,
"The max number of streams should be zero or positive.");
Preconditions.checkState(
!Optional.ofNullable(readOptions.getSnapshotTimestampInMillis())
!readOptions
.getSnapshotTimestampInMillis()
// see if the value is lower than the epoch
.filter(timeInMillis -> timeInMillis < Instant.EPOCH.toEpochMilli())
// if present, then fail
.isPresent(),
"The oldest timestamp should be equal or bigger than epoch.");
Preconditions.checkState(
!Optional.ofNullable(readOptions.getQuery())
!readOptions
.getQuery()
// if the project was not configured
.filter(q -> readOptions.getQueryExecutionProject() == null)
.filter(unusedQuery -> readOptions.getQueryExecutionProject() == null)
// if present fail
.isPresent(),
"If a query is configured, then a GCP projec should be provided.");
"If a query is configured, then a GCP project should be provided.");

return readOptions;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ public class BigQuerySourceReaderContext implements SourceReaderContext {

private final SourceReaderContext readerContext;
private final AtomicInteger readCount = new AtomicInteger(0);
private final Integer limit;
private final int limit;

public BigQuerySourceReaderContext(SourceReaderContext readerContext, Integer limit) {
public BigQuerySourceReaderContext(SourceReaderContext readerContext, int limit) {
this.readerContext = readerContext;
this.limit = limit;
}
Expand Down Expand Up @@ -73,15 +73,19 @@ public UserCodeClassLoader getUserCodeClassLoader() {
return readerContext.getUserCodeClassLoader();
}

public Integer updateReadCount(Integer newReads) {
public int updateReadCount(Integer newReads) {
return readCount.addAndGet(newReads);
}

public Integer currentReadCount() {
public int currentReadCount() {
return readCount.get();
}

public boolean willItBeOverLimit(Integer newReads) {
public boolean isLimitPushedDown() {
return limit > 0;
}

public boolean willExceedLimit(int newReads) {
return limit > 0 && (readCount.get() + newReads) >= limit;
}
}
Loading

0 comments on commit c923f31

Please sign in to comment.