Skip to content

Commit

Permalink
Enable read-session caching by default for faster Spark planning (#1109)
Browse files Browse the repository at this point in the history
  • Loading branch information
vishalkarve15 authored Oct 31, 2023
1 parent 1f31536 commit 17dbe36
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
* PR #1094: CVE-2023-5072: Upgrading the org.json:json dependency
* PR #1095: CVE-2023-4586: Upgrading the netty dependencies
* PR #1104: Fixed nested field predicate pushdown
* PR #1109: Enable read session caching by default for faster Spark planning
* PR #1111: Enable retry of failed messages
* Issue #103: Support for Dynamic partition overwrite for time and range partitioned table
* Issue #1099: Fixing the usage of ExternalAccountCredentials
Expand Down
9 changes: 9 additions & 0 deletions README-template.md
Original file line number Diff line number Diff line change
Expand Up @@ -877,6 +877,15 @@ word-break:break-word
</td>
<td>Write</td>
</tr>
<tr>
<td><code>enableReadSessionCaching</code>
</td>
<td>Boolean config to disable read session caching. Caches BigQuery read sessions to allow for faster Spark query planning.
Default value is <code>true</code>.
<br/> (Optional)
</td>
<td>Read</td>
</tr>

</table>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class ReadSessionCreator {

private static final Logger log = LoggerFactory.getLogger(ReadSessionCreator.class);
private static final Cache<CreateReadSessionRequest, ReadSession> READ_SESSION_CACHE =
CacheBuilder.newBuilder().expireAfterWrite(4, TimeUnit.HOURS).maximumSize(1000).build();
CacheBuilder.newBuilder().expireAfterWrite(5, TimeUnit.MINUTES).maximumSize(1000).build();

private final ReadSessionCreatorConfig config;
private final BigQueryClient bigQueryClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class ReadSessionCreatorConfigBuilder {
int streamsPerPartition = 1;
private CompressionCodec arrowCompressionCodec = CompressionCodec.COMPRESSION_UNSPECIFIED;
private Optional<String> traceId = Optional.empty();
private boolean enableReadSessionCaching = false;
private boolean enableReadSessionCaching = true;

@CanIgnoreReturnValue
public ReadSessionCreatorConfigBuilder setViewsEnabled(boolean viewsEnabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,10 @@ public void testSerializedInstanceIsPropagated() throws Exception {
Optional<String> encodedBase =
Optional.of(java.util.Base64.getEncoder().encodeToString(request.toByteArray()));
ReadSessionCreatorConfig config =
new ReadSessionCreatorConfigBuilder().setRequestEncodedBase(encodedBase).build();
new ReadSessionCreatorConfigBuilder()
.setEnableReadSessionCaching(false)
.setRequestEncodedBase(encodedBase)
.build();
ReadSessionCreator creator =
new ReadSessionCreator(config, bigQueryClient, bigQueryReadClientFactory);
when(bigQueryReadClientFactory.getBigQueryReadClient()).thenReturn(readClient);
Expand Down Expand Up @@ -153,7 +156,10 @@ public void testDefaultMinMaxStreamCount() throws Exception {
when(mockBigQueryClientFactory.getBigQueryReadClient()).thenReturn(client);

ReadSessionCreatorConfig config =
new ReadSessionCreatorConfigBuilder().setDefaultParallelism(10).build();
new ReadSessionCreatorConfigBuilder()
.setEnableReadSessionCaching(false)
.setDefaultParallelism(10)
.build();
ReadSessionCreator creator =
new ReadSessionCreator(config, bigQueryClient, mockBigQueryClientFactory);
ReadSessionResponse readSessionResponse =
Expand All @@ -179,6 +185,7 @@ public void testCustomMinStreamCount() throws Exception {

ReadSessionCreatorConfig config =
new ReadSessionCreatorConfigBuilder()
.setEnableReadSessionCaching(false)
.setDefaultParallelism(10)
.setPreferredMinParallelism(OptionalInt.of(21_000))
.build();
Expand Down Expand Up @@ -206,6 +213,7 @@ public void testCustomMaxStreamCount() throws Exception {

ReadSessionCreatorConfig config =
new ReadSessionCreatorConfigBuilder()
.setEnableReadSessionCaching(false)
.setDefaultParallelism(10)
.setMaxParallelism(OptionalInt.of(21_000))
.build();
Expand Down Expand Up @@ -234,6 +242,7 @@ public void testMinStreamCountGreaterThanMaxStreamCount() throws Exception {

ReadSessionCreatorConfig config =
new ReadSessionCreatorConfigBuilder()
.setEnableReadSessionCaching(false)
.setPreferredMinParallelism(OptionalInt.of(21_000))
.setMaxParallelism(OptionalInt.of(10))
.build();
Expand Down Expand Up @@ -261,6 +270,7 @@ public void testMaxStreamCountWithoutMinStreamCount() throws Exception {

ReadSessionCreatorConfig config =
new ReadSessionCreatorConfigBuilder()
.setEnableReadSessionCaching(false)
.setDefaultParallelism(20)
.setMaxParallelism(OptionalInt.of(10))
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ public static WriteMethod from(@Nullable String writeMethod) {
private int channelPoolSize = 1;
private com.google.common.base.Optional<Integer> flowControlWindowBytes =
com.google.common.base.Optional.absent();
private boolean enableReadSessionCaching = false;
private boolean enableReadSessionCaching = true;
private SparkBigQueryProxyAndHttpConfig sparkBigQueryProxyAndHttpConfig;
private CompressionCodec arrowCompressionCodec = DEFAULT_ARROW_COMPRESSION_CODEC;
private WriteMethod writeMethod = DEFAULT_WRITE_METHOD;
Expand Down Expand Up @@ -490,7 +490,7 @@ public static SparkBigQueryConfig from(
.transform(Integer::parseInt)
.or(defaultChannelPoolSize);
config.enableReadSessionCaching =
getAnyBooleanOption(globalOptions, options, "enableReadSessionCaching", false);
getAnyBooleanOption(globalOptions, options, "enableReadSessionCaching", true);

String arrowCompressionCodecParam =
getAnyOption(globalOptions, options, ARROW_COMPRESSION_CODEC_OPTION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ public Filter[] getAllFilters() {
}

/**
* Re-creates the read-sesion and re-computes partitions for dynamic partition pruning
* Re-creates the read session and re-computes partitions for dynamic partition pruning
*
* @param filters dynamic partition pruning filters
* @return new planned partitions if dynamic partition pruning goes through, else returns empty
Expand Down

0 comments on commit 17dbe36

Please sign in to comment.