diff --git a/clients/spark/core/src/main/scala/io/treeverse/clients/GarbageCollector.scala b/clients/spark/core/src/main/scala/io/treeverse/clients/GarbageCollector.scala index 299af9c5b47..aa7603f2132 100644 --- a/clients/spark/core/src/main/scala/io/treeverse/clients/GarbageCollector.scala +++ b/clients/spark/core/src/main/scala/io/treeverse/clients/GarbageCollector.scala @@ -1,10 +1,12 @@ package io.treeverse.clients import com.google.protobuf.timestamp.Timestamp -import io.treeverse.clients.LakeFSContext.{LAKEFS_CONF_API_ACCESS_KEY_KEY, LAKEFS_CONF_API_SECRET_KEY_KEY, LAKEFS_CONF_API_URL_KEY} +import io.treeverse.clients.LakeFSContext.{ + LAKEFS_CONF_API_ACCESS_KEY_KEY, + LAKEFS_CONF_API_SECRET_KEY_KEY, + LAKEFS_CONF_API_URL_KEY +} import org.apache.hadoop.conf.Configuration -import org.apache.spark -import org.apache.spark.rdd.RDD import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.{SparkSession, _} @@ -103,21 +105,19 @@ object GarbageCollector { } def getEntryAddressesIfInSet( - rangeID: String, - conf: APIConfigurations, - repo: String, - set:Set[String] - ): Set[String] = { + rangeID: String, + conf: APIConfigurations, + repo: String, + set: Set[String] + ): Set[String] = { val location = new ApiClient(conf.apiURL, conf.accessKey, conf.secretKey).getRangeURL(repo, rangeID) SSTableReader .forRange(new Configuration(), location) .newIterator() - .filter(x =>set.contains(x.message.address)) - .map(a => - new String(a.message.address), - ) + .filter(x => set.contains(x.message.address)) + .map(a => new String(a.message.address)) .toSet } @@ -235,16 +235,25 @@ object GarbageCollector { subtractDeduplications(expired, activeRangesDF, conf, repo, spark) } - private def subtractDeduplications(expired: Dataset[Row], activeRangesDF: Dataset[Row], conf: APIConfigurations, repo: String, spark: SparkSession) : Dataset[Row]= { + private def subtractDeduplications( + expired: Dataset[Row], + activeRangesDF: Dataset[Row], + conf: APIConfigurations, + repo: String, + spark: SparkSession + ): Dataset[Row] = { val expiredAddr: Set[String] = expired.select("address").collect().map(_.getString(0)).toSet - val ranges : Seq[String] = activeRangesDF.select("range_id").collect().map(_.getString(0)).toSeq.distinct + val ranges: Seq[String] = + activeRangesDF.select("range_id").collect().map(_.getString(0)).toSeq.distinct val rangesRDD = spark.sparkContext.parallelize(ranges) - val activeAddresses =rangesRDD.flatMap(range=> { - getEntryAddressesIfInSet(range, conf, repo, expiredAddr) - }).map(x => Row(x)) + val activeAddresses = rangesRDD + .flatMap(range => { + getEntryAddressesIfInSet(range, conf, repo, expiredAddr) + }) + .map(x => Row(x)) - val schema = new StructType().add(StructField("address",StringType, true)) + val schema = new StructType().add(StructField("address", StringType, true)) val activeDF = spark.createDataFrame(activeAddresses, schema) expired.join( activeDF, @@ -264,7 +273,8 @@ object GarbageCollector { val repo = args(0) val region = args(1) - val previousRunID = "" //args(2) // TODO(Guys): get previous runID from arguments or from storage + val previousRunID = + "" //args(2) // TODO(Guys): get previous runID from arguments or from storage val hc = spark.sparkContext.hadoopConfiguration val apiURL = hc.get(LAKEFS_CONF_API_URL_KEY) val accessKey = hc.get(LAKEFS_CONF_API_ACCESS_KEY_KEY) @@ -274,7 +284,7 @@ object GarbageCollector { val runID = res.getRunId val gcCommitsLocation = ApiClient.translateS3(new URI(res.getGcCommitsLocation)).toString - val gcAddressesLocation =ApiClient.translateS3(new URI(res.getGcAddressesLocation)).toString + val gcAddressesLocation = ApiClient.translateS3(new URI(res.getGcAddressesLocation)).toString val expiredAddresses = getExpiredAddresses(repo, runID, @@ -286,7 +296,7 @@ object GarbageCollector { .partitionBy("run_id") .mode(SaveMode.Overwrite) .parquet(gcAddressesLocation) - S3BulkDeleter.remove(repo, gcAddressesLocation,runID, region, spark) + S3BulkDeleter.remove(repo, gcAddressesLocation, runID, region, spark) } }