From 19650621f6a5a7cb30e3e6119e94e81bfb93765d Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Wed, 22 Jan 2025 16:22:54 -0500 Subject: [PATCH 1/4] Supply gcs-connector options from default Configuration --- build.sbt | 2 + .../scala/com/spotify/scio/ScioContext.scala | 82 +++++++++++++++++++ 2 files changed, 84 insertions(+) diff --git a/build.sbt b/build.sbt index 8f990bb2e8..12134281a0 100644 --- a/build.sbt +++ b/build.sbt @@ -837,6 +837,8 @@ lazy val `scio-core` = project "org.apache.beam" % "beam-runners-google-cloud-dataflow-java" % beamVersion % Provided, "org.apache.beam" % s"beam-runners-spark-$sparkMajorVersion" % beamVersion % Provided, "org.apache.beam" % "beam-sdks-java-extensions-google-cloud-platform-core" % beamVersion % Provided, + "org.apache.hadoop" % "hadoop-common" % hadoopVersion % Provided, + "com.google.cloud.bigdataoss" % "gcs-connector" % s"hadoop2-$bigdataossVersion" % Provided, // test "com.lihaoyi" %% "fansi" % fansiVersion % Test, "com.lihaoyi" %% "pprint" % pprintVersion % Test, diff --git a/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala b/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala index ce3e169b6a..96612e74f5 100644 --- a/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala +++ b/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala @@ -48,6 +48,7 @@ import scala.collection.mutable.{Buffer => MBuffer} import scala.concurrent.duration.Duration import scala.io.Source import scala.reflect.ClassTag +import scala.util.chaining._ import scala.util.control.NoStackTrace import scala.util.{Failure, Success, Try} import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions @@ -454,6 +455,87 @@ class ScioContext private[scio] ( o.setScioVersion(BuildInfo.version) } + { + import org.apache.hadoop.conf.Configuration + import com.google.cloud.hadoop.fs.gcs.{GoogleHadoopFileSystemConfiguration => GfsConfig} + import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadOptions + + try { + // If Hadoop is on the classpath, try to parse default gcs-connector options + val config = new Configuration() + val o = optionsAs[GcsOptions] + + o.setGoogleCloudStorageReadOptions( + GoogleCloudStorageReadOptions + .builder() + .pipe(o => + Option(config.get(GfsConfig.GCS_INPUT_STREAM_FAST_FAIL_ON_NOT_FOUND_ENABLE.getKey)) + .map(_.toBoolean) + .fold(o)(o.setFastFailOnNotFound) + ) + .pipe(o => + Option(config.get(GfsConfig.GCS_INPUT_STREAM_SUPPORT_GZIP_ENCODING_ENABLE.getKey)) + .map(_.toBoolean) + .fold(o)(o.setSupportGzipEncoding) + ) + .pipe(o => + Option(config.get(GfsConfig.GCS_INPUT_STREAM_INPLACE_SEEK_LIMIT.getKey)) + .map(_.toLong) + .fold(o)(o.setInplaceSeekLimit) + ) + .pipe(o => + Option(config.get(GfsConfig.GCS_INPUT_STREAM_FADVISE.getKey)) + .map(GoogleCloudStorageReadOptions.Fadvise.valueOf) + .fold(o)(o.setFadvise) + ) + .pipe(o => + Option(config.get(GfsConfig.GCS_INPUT_STREAM_MIN_RANGE_REQUEST_SIZE.getKey)) + .map(_.toInt) + .fold(o)(o.setMinRangeRequestSize) + ) + .pipe(o => + Option(config.get(GfsConfig.GCS_GRPC_CHECKSUMS_ENABLE.getKey)) + .map(_.toBoolean) + .fold(o)(o.setGrpcChecksumsEnabled) + ) + .pipe(o => + Option(config.get(GfsConfig.GCS_GRPC_READ_TIMEOUT_MS.getKey)) + .map(_.toLong) + .fold(o)(o.setGrpcReadTimeoutMillis) + ) + .pipe(o => + Option(config.get(GfsConfig.GCS_GRPC_READ_MESSAGE_TIMEOUT_MS.getKey)) + .map(_.toLong) + .fold(o)(o.setGrpcReadMessageTimeoutMillis) + ) + .pipe(o => + Option(config.get(GfsConfig.GCS_GRPC_READ_METADATA_TIMEOUT_MS.getKey)) + .map(_.toLong) + .fold(o)(o.setGrpcReadMetadataTimeoutMillis) + ) + .pipe(o => + Option(config.get(GfsConfig.GCS_GRPC_READ_ZEROCOPY_ENABLE.getKey)) + .map(_.toBoolean) + .fold(o)(o.setGrpcReadZeroCopyEnabled) + ) + .pipe(o => + Option(config.get(GfsConfig.GCS_TRACE_LOG_ENABLE.getKey)) + .map(_.toBoolean) + .fold(o)(o.setTraceLogEnabled) + ) + .pipe(o => + Option(config.get(GfsConfig.GCS_TRACE_LOG_TIME_THRESHOLD_MS.getKey)) + .map(_.toLong) + .fold(o)(o.setTraceLogTimeThreshold) + ) + .build() + ) + } catch { + // Hadoop and/or gcs-connector is excluded from classpath, do not try to set options + case _: ClassNotFoundException | _: NoSuchMethodException => + } + } + private[scio] def labels: Map[String, String] = (for { // Check if class is in classpath From 31352de8eab5ac301b90f4254aa6a194087ebdbf Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Wed, 22 Jan 2025 16:46:33 -0500 Subject: [PATCH 2/4] fix exception --- scio-core/src/main/scala/com/spotify/scio/ScioContext.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala b/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala index 96612e74f5..e429491a74 100644 --- a/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala +++ b/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala @@ -532,7 +532,7 @@ class ScioContext private[scio] ( ) } catch { // Hadoop and/or gcs-connector is excluded from classpath, do not try to set options - case _: ClassNotFoundException | _: NoSuchMethodException => + case _: NoClassDefFoundError | _: NoSuchMethodException => } } From bc11991d177afcc991551af5af5949c0b7ea5c1e Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Thu, 23 Jan 2025 15:10:08 -0500 Subject: [PATCH 3/4] declare all dependencies --- build.sbt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/build.sbt b/build.sbt index 12134281a0..cbcda63ef9 100644 --- a/build.sbt +++ b/build.sbt @@ -839,6 +839,8 @@ lazy val `scio-core` = project "org.apache.beam" % "beam-sdks-java-extensions-google-cloud-platform-core" % beamVersion % Provided, "org.apache.hadoop" % "hadoop-common" % hadoopVersion % Provided, "com.google.cloud.bigdataoss" % "gcs-connector" % s"hadoop2-$bigdataossVersion" % Provided, + "com.google.cloud.bigdataoss" % "gcsio" % bigdataossVersion % Provided, + "com.google.cloud.bigdataoss" % "util-hadoop" % s"hadoop2-$bigdataossVersion" % Provided, // test "com.lihaoyi" %% "fansi" % fansiVersion % Test, "com.lihaoyi" %% "pprint" % pprintVersion % Test, From 898b16e089b0bcf5928b5263691e08e3242c9ce3 Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Fri, 31 Jan 2025 12:08:41 -0500 Subject: [PATCH 4/4] link PR in Todo --- scio-core/src/main/scala/com/spotify/scio/ScioContext.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala b/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala index e429491a74..3157a1b826 100644 --- a/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala +++ b/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala @@ -465,6 +465,7 @@ class ScioContext private[scio] ( val config = new Configuration() val o = optionsAs[GcsOptions] + // Todo replace with built-in parser from gcsio when GoogleCloudDataproc/hadoop-connectors#1294 is merged o.setGoogleCloudStorageReadOptions( GoogleCloudStorageReadOptions .builder()