Skip to content

Commit

Permalink
Set parallelism for the parallelize job in recursiveListDirs
Browse files Browse the repository at this point in the history
  • Loading branch information
zsxwing committed Sep 23, 2024
1 parent 1753cb5 commit 22a8433
Showing 1 changed file with 4 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,10 @@ object DeltaFileOperations extends DeltaLogging {
import org.apache.spark.sql.delta.implicits._
if (subDirs.isEmpty) return spark.emptyDataset[SerializableFileStatus]
val listParallelism = fileListingParallelism.getOrElse(spark.sparkContext.defaultParallelism)
val dirsAndFiles = spark.sparkContext.parallelize(subDirs).mapPartitions { dirs =>
val subDirsParallelism = subDirs.length.min(spark.sparkContext.defaultParallelism)
val dirsAndFiles = spark.sparkContext.parallelize(
subDirs,
subDirsParallelism).mapPartitions { dirs =>
val logStore = LogStore(SparkEnv.get.conf, hadoopConf.value.value)
listUsingLogStore(
logStore,
Expand Down

0 comments on commit 22a8433

Please sign in to comment.