Skip to content

Commit

Permalink
Show actual error message in case of schema violations (#37)
Browse files Browse the repository at this point in the history
* Show actual error message in case of schema violations

* apply spotless

* Add validation of table definitions

* Add tests
  • Loading branch information
snuyanzin authored Aug 7, 2023
1 parent 5a9652c commit 27e54b1
Show file tree
Hide file tree
Showing 6 changed files with 506 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings;
import com.google.cloud.bigquery.storage.v1.Exceptions;
import com.google.cloud.bigquery.storage.v1.JsonStreamWriter;
import com.google.protobuf.Descriptors;
import java.io.IOException;
Expand All @@ -18,6 +19,7 @@
import java.time.LocalTime;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Phaser;
import javax.annotation.Nonnull;
Expand Down Expand Up @@ -135,6 +137,12 @@ public void writeRecord(RowData record) throws IOException {
| InterruptedException
| ExecutionException e) {
throw new IOException(e);
} catch (Exceptions.AppendSerializationError ase) {
Map<Integer, String> rowIndexToErrorMessage = ase.getRowIndexToErrorMessage();
if (rowIndexToErrorMessage != null && !rowIndexToErrorMessage.isEmpty()) {
throw new BigQueryConnectorRuntimeException(rowIndexToErrorMessage.toString(), ase);
}
throw ase;
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.aiven.flink.connectors.bigquery.sink;

public class BigQueryConnectorRuntimeException extends RuntimeException {
public BigQueryConnectorRuntimeException() {}

public BigQueryConnectorRuntimeException(String message) {
super(message);
}

public BigQueryConnectorRuntimeException(String message, Throwable cause) {
super(message, cause);
}

public BigQueryConnectorRuntimeException(Throwable cause) {
super(cause);
}

public BigQueryConnectorRuntimeException(
String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}
151 changes: 132 additions & 19 deletions src/main/java/io/aiven/flink/connectors/bigquery/sink/BigQuerySink.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@
import com.google.cloud.bigquery.DatasetInfo;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.FieldList;
import com.google.cloud.bigquery.LegacySQLTypeName;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableDefinition;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.storage.v1.TableName;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumMap;
Expand Down Expand Up @@ -63,11 +66,16 @@ public class BigQuerySink implements DynamicTableSink {
private final CatalogTable catalogTable;
private final ResolvedSchema tableSchema;
private final BigQueryConnectionOptions options;
private final DataType expectedDatatype;

public BigQuerySink(
CatalogTable catalogTable, ResolvedSchema tableSchema, BigQueryConnectionOptions options) {
CatalogTable catalogTable,
ResolvedSchema tableSchema,
DataType expectedDatatype,
BigQueryConnectionOptions options) {
this.catalogTable = catalogTable;
this.tableSchema = tableSchema;
this.expectedDatatype = expectedDatatype;
this.options = options;
}

Expand All @@ -93,6 +101,7 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
if (options.isCreateIfNotExists()) {
ensureTableExists(fieldNames, fieldTypes, options);
}

AbstractBigQueryOutputFormat outputFormat =
new AbstractBigQueryOutputFormat.Builder()
.withFieldNames(fieldNames)
Expand All @@ -104,7 +113,7 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {

@Override
public DynamicTableSink copy() {
return new BigQuerySink(catalogTable, tableSchema, options);
return new BigQuerySink(catalogTable, tableSchema, expectedDatatype, options);
}

@Override
Expand All @@ -115,33 +124,131 @@ public String asSummaryString() {
@VisibleForTesting
static Table ensureTableExists(
String[] fieldNames, DataType[] types, BigQueryConnectionOptions options) {
var bigQuery =
TableName tableName = options.getTableName();
var bigQueryService =
BigQueryOptions.newBuilder()
.setProjectId(options.getTableName().getProject())
.setProjectId(tableName.getProject())
.setCredentials(options.getCredentials())
.build()
.getService();
var dataset = options.getTableName().getDataset();
if (bigQuery.getDataset(dataset) == null || !bigQuery.getDataset(dataset).exists()) {
bigQuery.create(DatasetInfo.newBuilder(dataset).build());
var dataset = tableName.getDataset();
if (bigQueryService.getDataset(dataset) == null
|| !bigQueryService.getDataset(dataset).exists()) {
bigQueryService.create(DatasetInfo.newBuilder(dataset).build());
}
var tableId =
TableId.of(
options.getTableName().getProject(),
options.getTableName().getDataset(),
options.getTableName().getTable());
Table table = bigQuery.getTable(tableId);
var tableId = TableId.of(tableName.getProject(), tableName.getDataset(), tableName.getTable());
Table table = bigQueryService.getTable(tableId);
StandardTableDefinition requiredDefinition =
StandardTableDefinition.newBuilder().setSchema(schemaBuilder(fieldNames, types)).build();
if (table == null || !table.exists()) {
return bigQuery.create(
TableInfo.of(
tableId,
StandardTableDefinition.newBuilder()
.setSchema(schemaBuilder(fieldNames, types))
.build()));
return bigQueryService.create(TableInfo.of(tableId, requiredDefinition));
} else {
TableDefinition existingDefinition = table.getDefinition();
FieldList existingFieldList = existingDefinition.getSchema().getFields();
FieldList fieldList = requiredDefinition.getSchema().getFields();
validateTableDefinitions(existingFieldList, fieldList, null);
}
return table;
}

private static void validateTableDefinitions(
FieldList existingFieldList, FieldList fieldList, String parentTypeName) {
if (existingFieldList.size() < fieldList.size()) {
throw new ValidationException(
"Number of columns in BQ table ("
+ existingFieldList.size()
+ ") should be not less than a number of columns in corresponding Flink table ("
+ fieldList.size()
+ ")");
}
int fieldIndex = 0;
final String parentName = (parentTypeName == null ? "" : parentTypeName + ".");
for (int i = 0; i < existingFieldList.size(); i++) {
final Field existingField = existingFieldList.get(i);
Field.Mode existingFieldMode = existingField.getMode();
if (fieldIndex >= fieldList.size()) {
if (existingFieldMode != Field.Mode.NULLABLE) {
throw new ValidationException(
"Column #"
+ (i + 1)
+ " with name '"
+ parentName
+ existingField.getName()
+ "' in BQ is required however is absent in Flink table");
}
continue;
}
final Field fieldFromInsert = fieldList.get(fieldIndex);

if (existingFieldMode == Field.Mode.NULLABLE
&& !existingField.getName().equals(fieldFromInsert.getName())) {
continue;
}
if (!fieldFromInsert.getName().equals(existingField.getName())) {
throw new ValidationException(
"Column #"
+ (i + 1)
+ " has name '"
+ parentName
+ existingField.getName()
+ "' in BQ while in Flink it has name '"
+ parentName
+ fieldFromInsert.getName()
+ "'");
}
if (!fieldFromInsert
.getType()
.getStandardType()
.equals(existingField.getType().getStandardType())) {
throw new ValidationException(
"Column #"
+ (i + 1)
+ " with name '"
+ parentName
+ existingField.getName()
+ "' has type '"
+ existingField.getType().getStandardType()
+ "' in BQ while in Flink it has type '"
+ fieldFromInsert.getType().getStandardType()
+ "'");
}
if (existingFieldMode != Field.Mode.NULLABLE
&& fieldFromInsert.getMode() == Field.Mode.NULLABLE) {
if (!parentName.isBlank()) {
// currently this is a bug in Calcite/Flink
// so this validation will be done at runtime on BigQuery side
fieldIndex++;
continue;
}
throw new ValidationException(
"Column #"
+ (i + 1)
+ " with name '"
+ parentName
+ existingField.getName()
+ "' is not nullable '"
+ existingField.getType().getStandardType()
+ "' in BQ while in Flink it is nullable '"
+ fieldFromInsert.getType().getStandardType()
+ "'");
}
if (LegacySQLTypeName.RECORD.equals(existingField.getType())) {
validateTableDefinitions(
existingField.getSubFields(), fieldFromInsert.getSubFields(), existingField.getName());
}
fieldIndex++;
}
if (fieldIndex != fieldList.size()) {
throw new ValidationException(
"There are unknown columns starting with #"
+ (fieldIndex + 1)
+ " with name '"
+ parentName
+ fieldList.get(fieldIndex).getName()
+ "'");
}
}

private static Schema schemaBuilder(String[] fieldNames, DataType[] types) {
// ARRAY of ARRAYs is not allowed based on Google BigQuery doc
// https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#constructing_an_array
Expand Down Expand Up @@ -176,6 +283,12 @@ private static Field buildField(String fieldName, LogicalType logicalType) {
if (standardSQLTypeName == null) {
throw new ValidationException("Type " + logicalType + " is not supported");
}
if (logicalType.is(LogicalTypeRoot.ARRAY) && logicalType.getChildren().get(0).isNullable()) {
throw new ValidationException(
"Type "
+ logicalType
+ " is not supported (nullable elements of array are not supported by BQ)");
}
Field.Builder fBuilder;
if (logicalType.is(LogicalTypeRoot.ROW)
|| (logicalType.is(LogicalTypeRoot.ARRAY)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ public DynamicTableSink createDynamicTableSink(Context context) {
config.get(DELIVERY_GUARANTEE),
credentials);
return new BigQuerySink(
context.getCatalogTable(), context.getCatalogTable().getResolvedSchema(), options);
context.getCatalogTable(),
context.getCatalogTable().getResolvedSchema(),
context.getPhysicalRowDataType(),
options);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ public void testSinkForDifferentTypes() throws Exception {
Column.metadata("tmp9", DataTypes.TIMESTAMP(9).nullable(), null, false),
Column.metadata("date", DataTypes.DATE().nullable(), null, false),
Column.metadata("time", DataTypes.TIME().nullable(), null, false),
Column.metadata("array", DataTypes.ARRAY(DataTypes.INT()).nullable(), null, false),
Column.metadata(
"array", DataTypes.ARRAY(DataTypes.INT().notNull()).nullable(), null, false),
Column.metadata(
"row",
DataTypes.ROW(
Expand All @@ -52,7 +53,7 @@ public void testSinkForDifferentTypes() throws Exception {
Column.metadata("decimal", DataTypes.DECIMAL(5, 3).nullable(), null, false),
Column.metadata(
"decimal_array",
DataTypes.ARRAY(DataTypes.DECIMAL(7, 2).nullable()).nullable(),
DataTypes.ARRAY(DataTypes.DECIMAL(7, 2).notNull()).nullable(),
null,
false));
tableEnv
Expand All @@ -67,10 +68,10 @@ public void testSinkForDifferentTypes() throws Exception {
+ " `tmp9` TIMESTAMP(9),\n"
+ " `date` DATE,\n"
+ " `time` TIME,\n"
+ " `array` ARRAY<INT>,\n"
+ " `array` ARRAY<INT NOT NULL>,\n"
+ " `row` ROW<string_field STRING, int_field INT, date_field DATE>,\n"
+ " `decimal` DECIMAL(5, 3),\n"
+ " `decimal_array` ARRAY<DECIMAL(7, 2)>\n"
+ " `decimal_array` ARRAY<DECIMAL(7, 2) NOT NULL>\n"
+ ") WITH (\n"
+ " 'connector' = 'bigquery',"
+ " 'service-account' = '"
Expand Down
Loading

0 comments on commit 27e54b1

Please sign in to comment.