Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added test to validate recovery when StorageRead client fails #50

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
104 commits
Select commit Hold shift + click to select a range
eb9dc8f
cleaning up the main branch for new version.
prodriguezdefino Jun 27, 2023
0695673
updates in ignore and readme files
prodriguezdefino Jun 27, 2023
e36cc44
prepping the pom addition, added parent's compliance tools
prodriguezdefino Jun 27, 2023
e167803
adding parent pom and the connector impl project pom
prodriguezdefino Jun 28, 2023
8043378
adding common functionalities
prodriguezdefino Jun 28, 2023
b455b4e
added the bigquery services wrapper and factories
prodriguezdefino Jun 28, 2023
27cd837
creates the split, its state and the enumerator state
prodriguezdefino Jun 28, 2023
4d4b60f
added configs, split reader and split assignment impls
prodriguezdefino Jun 28, 2023
0567b58
applying recommendations from sonartype-lift
prodriguezdefino Jun 28, 2023
9006714
adding the Datastream source implementation for BigQuery
prodriguezdefino Jun 28, 2023
d5d95bf
added Table API implementation for BigQuery
prodriguezdefino Jun 29, 2023
1263768
adding the example and shaded distro jar, fixes a NPE when the provid…
prodriguezdefino Jun 29, 2023
4e185bd
added covertura analysis through open clover and improved few tests
prodriguezdefino Jun 30, 2023
4e63b02
fixes partition and row restriction orden for push downs
prodriguezdefino Jul 1, 2023
74d5b90
adds a test which make the storage read server stream iterator fail f…
prodriguezdefino Jul 2, 2023
5743292
merge changes from master (previous pom deletion resolution)
prodriguezdefino Jul 6, 2023
cab9115
fixing the package name for the schema namespace
prodriguezdefino Jul 6, 2023
3375d54
Merge branch 'common_code_source' into bq_services_wrappers
prodriguezdefino Jul 10, 2023
849f769
merged main branch and took care of few lift comments
prodriguezdefino Jul 11, 2023
fd70b95
Merge branch 'bq_services_wrappers' into source_splits
prodriguezdefino Jul 11, 2023
0f18d14
merge from source_split
prodriguezdefino Jul 11, 2023
6b08119
fixing lift recommendations and spotless
prodriguezdefino Jul 11, 2023
5973d26
Merge branch 'split_assigner_and_reader' into source_functionality
prodriguezdefino Jul 11, 2023
2d9635d
Merge branch 'source_functionality' into table_api
prodriguezdefino Jul 11, 2023
135beeb
Merge branch 'table_api' into add_example_and_shadedsqljar
prodriguezdefino Jul 11, 2023
78aec3e
merge from example and distro branch
prodriguezdefino Jul 11, 2023
3ffb582
fixes namespace error in test and spotless
prodriguezdefino Jul 11, 2023
cc45b2f
Merge branch 'table_api' into add_example_and_shadedsqljar
prodriguezdefino Jul 11, 2023
3472934
Merge branch 'add_example_and_shadedsqljar' into use_openclover_coverage
prodriguezdefino Jul 11, 2023
1dabd70
removed repeated mvn reference from command
prodriguezdefino Jul 11, 2023
27fa94a
Merge branch 'use_openclover_coverage' into partition_restriction_fixes
prodriguezdefino Jul 11, 2023
1d0dafa
adds documentation to enum
prodriguezdefino Jul 11, 2023
289c31c
Merge branch 'partition_restriction_fixes' into test_bqstoragereadapi…
prodriguezdefino Jul 11, 2023
f473d57
addressing comments from review
prodriguezdefino Jul 27, 2023
c178f83
merge from main
prodriguezdefino Aug 1, 2023
09eaaa4
merge from master
prodriguezdefino Aug 1, 2023
def3cc4
Merge branch 'source_splits' into split_assigner_and_reader
prodriguezdefino Aug 1, 2023
ceabb12
fixed type reference Int -> Long
prodriguezdefino Aug 1, 2023
0dc8875
Merge branch 'split_assigner_and_reader' into source_functionality
prodriguezdefino Aug 3, 2023
963c80b
Merge branch 'source_functionality' into table_api
prodriguezdefino Aug 3, 2023
5206ef1
Merge branch 'table_api' into add_example_and_shadedsqljar
prodriguezdefino Aug 3, 2023
2b59a7c
Merge branch 'add_example_and_shadedsqljar' into use_openclover_coverage
prodriguezdefino Aug 3, 2023
4eb1137
Merge branch 'use_openclover_coverage' into partition_restriction_fixes
prodriguezdefino Aug 3, 2023
6db9dea
Merge branch 'partition_restriction_fixes' into test_bqstoragereadapi…
prodriguezdefino Aug 3, 2023
1734bac
merge from main
prodriguezdefino Aug 8, 2023
e96ff59
addressing comments from review
prodriguezdefino Aug 8, 2023
3b78492
Merge branch 'split_assigner_and_reader' into source_functionality
prodriguezdefino Aug 8, 2023
c492f02
improved hashcode and equals readability
prodriguezdefino Aug 8, 2023
6a05498
Merge branch 'split_assigner_and_reader' into source_functionality
prodriguezdefino Aug 8, 2023
820fb3b
Merge branch 'source_functionality' into table_api
prodriguezdefino Aug 8, 2023
dd5be94
improve readibility for hashcode and equals
prodriguezdefino Aug 8, 2023
fbd07c6
Merge branch 'table_api' into add_example_and_shadedsqljar
prodriguezdefino Aug 8, 2023
a44b11d
Merge branch 'add_example_and_shadedsqljar' into use_openclover_coverage
prodriguezdefino Aug 8, 2023
156552f
Merge branch 'use_openclover_coverage' into partition_restriction_fixes
prodriguezdefino Aug 8, 2023
31559d5
Merge branch 'partition_restriction_fixes' into test_bqstoragereadapi…
prodriguezdefino Aug 8, 2023
9aae0af
changed tests to use google-truth instead of junit or assertj asserti…
prodriguezdefino Aug 9, 2023
61e5644
Merge branch 'split_assigner_and_reader' into source_functionality
prodriguezdefino Aug 9, 2023
517de82
added google-truth to tests
prodriguezdefino Aug 9, 2023
d916a1c
Merge branch 'source_functionality' into table_api
prodriguezdefino Aug 9, 2023
11fce0d
added google-truth to tests
prodriguezdefino Aug 9, 2023
099078e
Merge branch 'table_api' into add_example_and_shadedsqljar
prodriguezdefino Aug 10, 2023
40db3cb
merge from previous branch
prodriguezdefino Aug 10, 2023
8dbf958
merge from previous branch
prodriguezdefino Aug 10, 2023
d1c56d4
merge from previous branch
prodriguezdefino Aug 10, 2023
9da7706
merge from master after #44
prodriguezdefino Oct 31, 2023
a10470c
Merge branch 'source_functionality' into table_api
prodriguezdefino Oct 31, 2023
6b28a0c
removing guava dependency from file
prodriguezdefino Nov 1, 2023
7160ff9
merge from master
prodriguezdefino Nov 1, 2023
87780c5
adding serializable autovalue annotation to avoid storing an Optional…
prodriguezdefino Nov 2, 2023
3f4d1be
addressing comments from review
prodriguezdefino Nov 2, 2023
c8eb789
addressed comments from review
prodriguezdefino Nov 2, 2023
fd79610
Merge branch 'table_api' into add_example_and_shadedsqljar
prodriguezdefino Nov 2, 2023
8008260
merge from #47
prodriguezdefino Nov 2, 2023
c923f31
merge from #48
prodriguezdefino Nov 2, 2023
d135144
Merge branch 'partition_restriction_fixes' into test_bqstoragereadapi…
prodriguezdefino Nov 3, 2023
cce6567
partition list in 1024 chunks
prodriguezdefino Nov 3, 2023
f36f91f
Merge branch 'table_api' into add_example_and_shadedsqljar
prodriguezdefino Nov 3, 2023
59472b7
Merge branch 'add_example_and_shadedsqljar' into use_openclover_coverage
prodriguezdefino Nov 3, 2023
3d994a2
Merge branch 'use_openclover_coverage' into partition_restriction_fixes
prodriguezdefino Nov 3, 2023
d9027ab
Merge branch 'partition_restriction_fixes' into test_bqstoragereadapi…
prodriguezdefino Nov 3, 2023
5260073
Address review comments
jayehwhyehentee Nov 23, 2023
936076b
adding the example and shaded distro jar, fixes a NPE when the provid…
prodriguezdefino Jun 29, 2023
7977576
Merge branch 'add_example_and_shadedsqljar' of https://github.com/pro…
jayehwhyehentee Nov 24, 2023
1460af6
added covertura analysis through open clover and improved few tests
prodriguezdefino Jun 30, 2023
a8499c3
removed repeated mvn reference from command
prodriguezdefino Jul 11, 2023
229f33d
Merge branch 'main' into use_openclover_coverage
jayehwhyehentee Nov 24, 2023
68d0bf9
Merge branch 'main' into add_example_and_shadedsqljar
jayehwhyehentee Nov 24, 2023
608e3c7
Merge branch 'add_example_and_shadedsqljar' into use_openclover_coverage
jayehwhyehentee Nov 24, 2023
9d130a4
Merge branch 'use_openclover_coverage' into partition_restriction_fixes
jayehwhyehentee Nov 24, 2023
2711002
Merge branch 'partition_restriction_fixes' into test_bqstoragereadapi…
jayehwhyehentee Nov 24, 2023
c9f73ee
Address review comments
jayehwhyehentee Nov 27, 2023
8c9cc26
Merge branch 'add_example_and_shadedsqljar' into use_openclover_coverage
jayehwhyehentee Nov 27, 2023
9a6a3bf
Merge branch 'use_openclover_coverage' into partition_restriction_fixes
jayehwhyehentee Nov 27, 2023
c6ba786
Address review comments
jayehwhyehentee Nov 27, 2023
27c34d0
Make checkpoint interval configurable in example jar
jayehwhyehentee Nov 27, 2023
fa4f5a4
Merge branch 'add_example_and_shadedsqljar' into use_openclover_coverage
jayehwhyehentee Nov 27, 2023
582879e
Merge branch 'use_openclover_coverage' into partition_restriction_fixes
jayehwhyehentee Nov 27, 2023
2f4518a
Merge branch 'main' into partition_restriction_fixes
jayehwhyehentee Nov 27, 2023
dde0b6d
Remove incorrect dependency to fix build failure
jayehwhyehentee Nov 27, 2023
04497c5
Merge branch 'partition_restriction_fixes' into test_bqstoragereadapi…
jayehwhyehentee Nov 27, 2023
f173a87
Add TODO to mention a caveat in BQ utils.
jayehwhyehentee Nov 27, 2023
af4531e
Merge branch 'partition_restriction_fixes' into test_bqstoragereadapi…
jayehwhyehentee Nov 27, 2023
9c5ba89
Use java.util.Random with seed for deterministic tests
jayehwhyehentee Nov 27, 2023
33573b8
Merge branch 'main' into test_bqstoragereadapi_error
jayehwhyehentee Nov 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public RecordsWithSplitIds<GenericRecord> fetch() throws IOException {
ReadRowsResponse response = readStreamIterator.next();
if (!response.hasAvroRows()) {
LOG.info(
"[subtask #{}][hostname %s] The response contained"
"[subtask #{}][hostname {}] The response contained"
+ " no avro records for stream {}.",
readerContext.getIndexOfSubtask(),
readerContext.getLocalHostName(),
Expand Down Expand Up @@ -213,7 +213,7 @@ public RecordsWithSplitIds<GenericRecord> fetch() throws IOException {
}
Long itTimeMs = System.currentTimeMillis() - itStartTime;
LOG.debug(
"[subtask #{}][hostname %s] Completed reading iteration in {}ms,"
"[subtask #{}][hostname {}] Completed reading iteration in {}ms,"
+ " so far read {} from stream {}.",
readerContext.getIndexOfSubtask(),
readerContext.getLocalHostName(),
Expand All @@ -240,7 +240,7 @@ public RecordsWithSplitIds<GenericRecord> fetch() throws IOException {
Long splitTimeMs = System.currentTimeMillis() - splitStartFetch;
this.readSplitTimeMetric.ifPresent(m -> m.update(splitTimeMs));
LOG.info(
"[subtask #{}][hostname %s] Completed reading split, {} records in {}ms on stream {}.",
"[subtask #{}][hostname {}] Completed reading split, {} records in {}ms on stream {}.",
readerContext.getIndexOfSubtask(),
readerContext.getLocalHostName(),
readSoFar,
Expand All @@ -253,7 +253,7 @@ public RecordsWithSplitIds<GenericRecord> fetch() throws IOException {
} else {
Long fetchTimeMs = System.currentTimeMillis() - fetchStartTime;
LOG.debug(
"[subtask #{}][hostname %s] Completed a partial fetch in {}ms,"
"[subtask #{}][hostname {}] Completed a partial fetch in {}ms,"
+ " so far read {} from stream {}.",
readerContext.getIndexOfSubtask(),
readerContext.getLocalHostName(),
Expand All @@ -263,13 +263,15 @@ public RecordsWithSplitIds<GenericRecord> fetch() throws IOException {
}
return respBuilder.build();
} catch (Exception ex) {
// release the iterator just in case
readStreamIterator = null;
throw new IOException(
String.format(
"[subtask #%d][hostname %s] Problems while reading stream %s from BigQuery"
+ " with connection info %s. Current split offset %d,"
+ " reader offset %d. Flink options %s.",
readerContext.getIndexOfSubtask(),
readerContext.getLocalHostName(),
Optional.ofNullable(readerContext.getLocalHostName()).orElse("NA"),
Optional.ofNullable(assignedSplit.getStreamName()).orElse("NA"),
readOptions.toString(),
assignedSplit.getOffset(),
Expand All @@ -296,15 +298,15 @@ public void handleSplitsChanges(SplitsChange<BigQuerySourceSplit> splitsChanges)
@Override
public void wakeUp() {
LOG.debug(
"[subtask #{}][hostname %s] Wake up called.",
"[subtask #{}][hostname %{}] Wake up called.",
readerContext.getIndexOfSubtask(), readerContext.getLocalHostName());
// do nothing, for now
}

@Override
public void close() throws Exception {
LOG.debug(
"[subtask #{}][hostname %s] Close called, assigned splits {}.",
"[subtask #{}][hostname {}] Close called, assigned splits {}.",
readerContext.getIndexOfSubtask(),
readerContext.getLocalHostName(),
assignedSplits.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.google.cloud.flink.bigquery.fakes;

import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SerializableFunction;

import com.google.api.services.bigquery.model.Job;
Expand Down Expand Up @@ -62,6 +63,7 @@
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand Down Expand Up @@ -131,18 +133,60 @@ public Job dryRunQuery(String projectId, String query) {
};
}

static class FaultyIterator<T> implements Iterator<T> {

private final Iterator<T> realIterator;
private final Double errorPercentage;
private final Random random = new Random(42);

public FaultyIterator(Iterator<T> realIterator, Double errorPercentage) {
this.realIterator = realIterator;
Preconditions.checkState(
0 <= errorPercentage && errorPercentage <= 100,
"The error percentage should be between 0 and 100");
this.errorPercentage = errorPercentage;
}

@Override
public boolean hasNext() {
return realIterator.hasNext();
}

@Override
public T next() {
if (random.nextDouble() * 100 < errorPercentage) {
throw new RuntimeException(
"Faulty iterator has failed, it will happen with a chance of: "
+ errorPercentage);
}
return realIterator.next();
}

@Override
public void remove() {
realIterator.remove();
}

@Override
public void forEachRemaining(Consumer<? super T> action) {
realIterator.forEachRemaining(action);
}
}

/** Implementation of the server stream for testing purposes. */
public static class FakeBigQueryServerStream
implements BigQueryServices.BigQueryServerStream<ReadRowsResponse> {

private final List<ReadRowsResponse> toReturn;
private final Double errorPercentage;

public FakeBigQueryServerStream(
SerializableFunction<RecordGenerationParams, List<GenericRecord>> dataGenerator,
String schema,
String dataPrefix,
Long size,
Long offset) {
Long offset,
Double errorPercentage) {
this.toReturn =
createResponse(
schema,
Expand All @@ -153,11 +197,12 @@ public FakeBigQueryServerStream(
.collect(Collectors.toList()),
0,
size);
this.errorPercentage = errorPercentage;
}

@Override
public Iterator<ReadRowsResponse> iterator() {
return toReturn.iterator();
return new FaultyIterator<>(toReturn.iterator(), errorPercentage);
}

@Override
Expand All @@ -170,13 +215,22 @@ public static class FakeBigQueryStorageReadClient implements StorageReadClient {
private final ReadSession session;
private final SerializableFunction<RecordGenerationParams, List<GenericRecord>>
dataGenerator;
private final Double errorPercentage;

public FakeBigQueryStorageReadClient(
ReadSession session,
SerializableFunction<RecordGenerationParams, List<GenericRecord>>
dataGenerator) {
this(session, dataGenerator, 0D);
}

public FakeBigQueryStorageReadClient(
ReadSession session,
SerializableFunction<RecordGenerationParams, List<GenericRecord>> dataGenerator,
Double errorPercentage) {
this.session = session;
this.dataGenerator = dataGenerator;
this.errorPercentage = errorPercentage;
}

@Override
Expand All @@ -196,7 +250,8 @@ public BigQueryServerStream<ReadRowsResponse> readRows(ReadRowsRequest request)
session.getAvroSchema().getSchema(),
request.getReadStream(),
session.getEstimatedRowCount(),
request.getOffset());
request.getOffset(),
errorPercentage);
}

@Override
Expand Down Expand Up @@ -381,6 +436,17 @@ public static BigQueryReadOptions createReadOptions(
String avroSchemaString,
SerializableFunction<RecordGenerationParams, List<GenericRecord>> dataGenerator)
throws IOException {
return createReadOptions(
expectedRowCount, expectedReadStreamCount, avroSchemaString, dataGenerator, 0D);
}

public static BigQueryReadOptions createReadOptions(
Integer expectedRowCount,
Integer expectedReadStreamCount,
String avroSchemaString,
SerializableFunction<RecordGenerationParams, List<GenericRecord>> dataGenerator,
Double errorPercentage)
throws IOException {
return BigQueryReadOptions.builder()
.setBigQueryConnectOptions(
BigQueryConnectOptions.builder()
Expand All @@ -397,7 +463,8 @@ public static BigQueryReadOptions createReadOptions(
expectedRowCount,
expectedReadStreamCount,
avroSchemaString),
dataGenerator));
dataGenerator,
errorPercentage));
})
.build())
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class BigQuerySourceITCase {

private static final int PARALLELISM = 2;
private static final Integer TOTAL_ROW_COUNT_PER_STREAM = 10000;
private static final Integer STREAM_COUNT = 2;

@RegisterExtension
static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
Expand All @@ -71,7 +72,7 @@ public void beforeTest() throws Exception {
readOptions =
StorageClientFaker.createReadOptions(
TOTAL_ROW_COUNT_PER_STREAM,
2,
STREAM_COUNT,
StorageClientFaker.SIMPLE_AVRO_SCHEMA_STRING);
}

Expand Down Expand Up @@ -108,7 +109,7 @@ public void testReadCount() throws Exception {
.executeAndCollect());

// we only create 2 streams as response
assertThat(results).hasSize(TOTAL_ROW_COUNT_PER_STREAM * PARALLELISM);
assertThat(results).hasSize(TOTAL_ROW_COUNT_PER_STREAM * STREAM_COUNT);
}

@Test
Expand All @@ -121,12 +122,12 @@ public void testLimit() throws Exception {
List<RowData> results =
env.fromSource(bqSource, WatermarkStrategy.noWatermarks(), "BigQuery-Source")
.executeAndCollect(TOTAL_ROW_COUNT_PER_STREAM);

// need to check on parallelism since the limit is triggered per task + reader contexts = 2
assertThat(results).hasSize(limitSize * PARALLELISM);
}

@Test
public void testRecovery() throws Exception {
public void testDownstreamRecovery() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(300L);

Expand All @@ -142,7 +143,46 @@ public void testRecovery() throws Exception {
.map(new FailingMapper(failed))
.executeAndCollect());

assertThat(results).hasSize(TOTAL_ROW_COUNT_PER_STREAM * PARALLELISM);
assertThat(results).hasSize(TOTAL_ROW_COUNT_PER_STREAM * STREAM_COUNT);
}

@Test
public void testReaderRecovery() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(300L);

ResolvedSchema schema =
ResolvedSchema.of(
Column.physical("name", DataTypes.STRING()),
Column.physical("number", DataTypes.BIGINT()));
RowType rowType = (RowType) schema.toPhysicalRowDataType().getLogicalType();

TypeInformation<RowData> typeInfo = InternalTypeInfo.of(rowType);

BigQuerySource<RowData> bqSource =
BigQuerySource.<RowData>builder()
.setReadOptions(
StorageClientFaker.createReadOptions(
// just put more rows JIC
TOTAL_ROW_COUNT_PER_STREAM,
STREAM_COUNT,
StorageClientFaker.SIMPLE_AVRO_SCHEMA_STRING,
params -> StorageClientFaker.createRecordList(params),
// we want this to fail 10% of the time (1 in 10 times)
10D))
.setDeserializationSchema(
new AvroToRowDataDeserializationSchema(rowType, typeInfo))
.build();

List<RowData> results =
CollectionUtil.iteratorToList(
env.fromSource(
bqSource,
WatermarkStrategy.noWatermarks(),
"BigQuery-Source")
.executeAndCollect());

assertThat(results).hasSize(TOTAL_ROW_COUNT_PER_STREAM * STREAM_COUNT);
}

private static class FailingMapper
Expand Down