From b5898dc1c26a1d37823e7867556ab699e7224d24 Mon Sep 17 00:00:00 2001 From: guyhardonag Date: Thu, 1 Jul 2021 15:29:28 +0300 Subject: [PATCH 1/6] GC combine to one command and handle deduplications --- .../io/treeverse/clients/ApiClient.scala | 25 +++- .../treeverse/clients/GarbageCollector.scala | 139 ++++++++++++------ 2 files changed, 114 insertions(+), 50 deletions(-) diff --git a/clients/spark/core/src/main/scala/io/treeverse/clients/ApiClient.scala b/clients/spark/core/src/main/scala/io/treeverse/clients/ApiClient.scala index 5680ca27594..15b2abf96fd 100644 --- a/clients/spark/core/src/main/scala/io/treeverse/clients/ApiClient.scala +++ b/clients/spark/core/src/main/scala/io/treeverse/clients/ApiClient.scala @@ -2,6 +2,11 @@ package io.treeverse.clients import com.google.common.cache.CacheBuilder import io.lakefs.clients.api +import io.lakefs.clients.api.RetentionApi +import io.lakefs.clients.api.model.{ + GarbageCollectionPrepareRequest, + GarbageCollectionPrepareResponse +} import java.net.URI import java.util.concurrent.TimeUnit @@ -31,6 +36,7 @@ class ApiClient(apiUrl: String, accessKey: String, secretKey: String) { private val commitsApi = new api.CommitsApi(client) private val metadataApi = new api.MetadataApi(client) private val branchesApi = new api.BranchesApi(client) + private val retentionApi = new RetentionApi(client) private val storageNamespaceCache = CacheBuilder.newBuilder().expireAfterWrite(2, TimeUnit.MINUTES).build[String, String]() @@ -50,13 +56,24 @@ class ApiClient(apiUrl: String, accessKey: String, secretKey: String) { ) } + def prepareGarbageCollectionCommits( + repoName: String, + previousRunID: String + ): GarbageCollectionPrepareResponse = { + retentionApi.prepareGarbageCollectionCommits( + repoName, + new GarbageCollectionPrepareRequest().previousRunId(previousRunID) + ) + } + def getMetaRangeURL(repoName: String, commitID: String): String = { val commit = commitsApi.getCommit(repoName, commitID) val metaRangeID = commit.getMetaRangeId - - val metaRange = metadataApi.getMetaRange(repoName, metaRangeID) - val location = metaRange.getLocation - URI.create(getStorageNamespace(repoName) + "/" + location).normalize().toString + if (metaRangeID != "") { + val metaRange = metadataApi.getMetaRange(repoName, metaRangeID) + val location = metaRange.getLocation + URI.create(getStorageNamespace(repoName) + "/" + location).normalize().toString + } else "" } def getRangeURL(repoName: String, rangeID: String): String = { diff --git a/clients/spark/core/src/main/scala/io/treeverse/clients/GarbageCollector.scala b/clients/spark/core/src/main/scala/io/treeverse/clients/GarbageCollector.scala index 3c5fa93a247..087182a60c6 100644 --- a/clients/spark/core/src/main/scala/io/treeverse/clients/GarbageCollector.scala +++ b/clients/spark/core/src/main/scala/io/treeverse/clients/GarbageCollector.scala @@ -1,13 +1,12 @@ package io.treeverse.clients import com.google.protobuf.timestamp.Timestamp -import io.treeverse.clients.LakeFSContext.{ - LAKEFS_CONF_API_ACCESS_KEY_KEY, - LAKEFS_CONF_API_SECRET_KEY_KEY, - LAKEFS_CONF_API_URL_KEY -} +import io.treeverse.clients.LakeFSContext.{LAKEFS_CONF_API_ACCESS_KEY_KEY, LAKEFS_CONF_API_SECRET_KEY_KEY, LAKEFS_CONF_API_URL_KEY} import org.apache.hadoop.conf.Configuration +import org.apache.spark +import org.apache.spark.rdd.RDD import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.{SparkSession, _} import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration import software.amazon.awssdk.core.retry.RetryPolicy @@ -28,7 +27,6 @@ object GarbageCollector { .option("header", value = true) .option("inferSchema", value = true) .csv(commitDFLocation) - .where(col("run_id") === runID) } private def getRangeTuples( @@ -38,13 +36,15 @@ object GarbageCollector { ): Set[(String, Array[Byte], Array[Byte])] = { val location = new ApiClient(conf.apiURL, conf.accessKey, conf.secretKey).getMetaRangeURL(repo, commitID) - SSTableReader - .forMetaRange(new Configuration(), location) - .newIterator() - .map(range => - (new String(range.id), range.message.minKey.toByteArray, range.message.maxKey.toByteArray) - ) - .toSet + if (location == "") Set[(String, Array[Byte], Array[Byte])]() + else + SSTableReader + .forMetaRange(new Configuration(), location) + .newIterator() + .map(range => + (new String(range.id), range.message.minKey.toByteArray, range.message.maxKey.toByteArray) + ) + .toSet } def getRangesDFFromCommits( @@ -102,6 +102,26 @@ object GarbageCollector { .toSet } + def getEntryAddressesIfInSet( + rangeID: String, + conf: APIConfigurations, + repo: String, + set:Set[String] + ): Set[String] = { + + val location = + new ApiClient(conf.apiURL, conf.accessKey, conf.secretKey).getRangeURL(repo, rangeID) + SSTableReader + .forRange(new Configuration(), location) + .newIterator() + .filter(!_.message.addressType.isRelative) + .filter(x =>set.contains(x.message.address)) + .map(a => + new String(a.message.address), + ) + .toSet + } + /** @param leftRangeIDs * @param rightRangeIDs * @param conf @@ -209,48 +229,76 @@ object GarbageCollector { ): Dataset[Row] = { val commitsDF = getCommitsDF(runID, commitDFLocation, spark) val rangesDF = getRangesDFFromCommits(commitsDF, repo, conf) - getExpiredEntriesFromRanges(rangesDF, conf, repo) + val expired = getExpiredEntriesFromRanges(rangesDF, conf, repo) + + // validate deduplications + val activeRangesDF = rangesDF.where("!expired") + subtractDeduplications(expired, activeRangesDF, conf, repo, spark) + } + + private def subtractDeduplications(expired: Dataset[Row], activeRangesDF: Dataset[Row], conf: APIConfigurations, repo: String, spark: SparkSession) : Dataset[Row]= { + val expiredAddr: Set[String] = expired.select("address").collect().map(_.getString(0)).toSet + val ranges : Seq[String] = activeRangesDF.select("range_id").collect().map(_.getString(0)).toSeq.distinct + val rangesRDD = spark.sparkContext.parallelize(ranges) + + val activeAddresses =rangesRDD.flatMap(range=> { + getEntryAddressesIfInSet(range, conf, repo, expiredAddr) + }).map(x => Row(x)) + + val schema = new StructType().add(StructField("address",StringType, true)) + val activeDF = spark.createDataFrame(activeAddresses, schema) + expired.join( + activeDF, + expired("address") === activeDF("address"), + "leftanti" + ) } def main(args: Array[String]) { val spark = SparkSession.builder().getOrCreate() - if (args.length != 4) { + if (args.length != 2) { Console.err.println( - "Usage: ... s3://storageNamespace/prepared_commits_table s3://storageNamespace/output_destination_table" + "Usage: ... " ) System.exit(1) } val repo = args(0) - val runID = args(1) - val commitDFLocation = args(2) - val addressesDFLocation = args(3) - + val region = args(1) + val previousRunID = "" //args(2) // TODO(Guys): get previous runID from arguments or from storage val hc = spark.sparkContext.hadoopConfiguration val apiURL = hc.get(LAKEFS_CONF_API_URL_KEY) val accessKey = hc.get(LAKEFS_CONF_API_ACCESS_KEY_KEY) val secretKey = hc.get(LAKEFS_CONF_API_SECRET_KEY_KEY) + val res = new ApiClient(apiURL, accessKey, secretKey) + .prepareGarbageCollectionCommits(repo, previousRunID) + val runID = res.getRunId + + val gcCommitsLocation = ApiClient.translateS3(new URI(res.getGcCommitsLocation)).toString + val gcAddressesLocation =ApiClient.translateS3(new URI(res.getGcAddressesLocation)).toString + val expiredAddresses = getExpiredAddresses(repo, runID, - commitDFLocation, + gcCommitsLocation, spark, APIConfigurations(apiURL, accessKey, secretKey) ).withColumn("run_id", lit(runID)) expiredAddresses.write .partitionBy("run_id") - .mode(SaveMode.Append) - .parquet(addressesDFLocation) // TODO(Guys): consider changing to overwrite + .mode(SaveMode.Overwrite) + .parquet(gcAddressesLocation) + S3BulkDeleter.remove(repo, gcAddressesLocation,runID, region, spark) } } object S3BulkDeleter { - def repartitionBySize(df: DataFrame, maxSize: Int, column: String): DataFrame = { + private def repartitionBySize(df: DataFrame, maxSize: Int, column: String): DataFrame = { val nRows = df.count() val nPartitions = math.max(1, math.floor(nRows / maxSize)).toInt df.repartitionByRange(nPartitions, col(column)) } - def delObjIteration( + private def delObjIteration( bucket: String, keys: Seq[String], s3Client: S3Client, @@ -297,21 +345,9 @@ object S3BulkDeleter { }) } - def main(args: Array[String]): Unit = { - if (args.length != 5) { - Console.err.println( - "Usage: ... s3://storageNamespace/prepared_addresses_table s3://storageNamespace/output_destination_table" - ) - System.exit(1) - } + def remove(repo: String, location: String, runID: String, region: String, spark: SparkSession) = { val MaxBulkSize = 1000 val awsRetries = 1000 - val repo = args(0) - val runID = args(1) - val region = args(2) - val addressesDFLocation = args(3) - val deletedAddressesDFLocation = args(4) - val spark = SparkSession.builder().getOrCreate() val hc = spark.sparkContext.hadoopConfiguration val apiURL = hc.get(LAKEFS_CONF_API_URL_KEY) @@ -326,16 +362,27 @@ object S3BulkDeleter { if (addSuffixSlash.startsWith("/")) addSuffixSlash.substring(1) else addSuffixSlash val df = spark.read - .parquet(addressesDFLocation) + .parquet(location) .where(col("run_id") === runID) .where(col("relative") === true) val res = - bulkRemove(df, MaxBulkSize, spark, bucket, region, awsRetries, snPrefix).toDF("addresses") - res - .withColumn("run_id", lit(runID)) - .write - .partitionBy("run_id") - .mode(SaveMode.Append) - .parquet(deletedAddressesDFLocation) + bulkRemove(df, MaxBulkSize, spark, bucket, region, awsRetries, snPrefix) + .toDF("addresses") + .collect() + } + + def main(args: Array[String]): Unit = { + if (args.length != 5) { + Console.err.println( + "Usage: ... s3://storageNamespace/prepared_addresses_table" + ) + System.exit(1) + } + val repo = args(0) + val runID = args(1) + val region = args(2) + val addressesDFLocation = args(3) + val spark = SparkSession.builder().getOrCreate() + remove(repo, addressesDFLocation, runID, region, spark) } } From 316a910db47dc0bd28dce44a600af296aa8e8123 Mon Sep 17 00:00:00 2001 From: guyhardonag Date: Mon, 5 Jul 2021 11:25:52 +0300 Subject: [PATCH 2/6] Bug fix --- .../src/main/scala/io/treeverse/clients/GarbageCollector.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/clients/spark/core/src/main/scala/io/treeverse/clients/GarbageCollector.scala b/clients/spark/core/src/main/scala/io/treeverse/clients/GarbageCollector.scala index 087182a60c6..299af9c5b47 100644 --- a/clients/spark/core/src/main/scala/io/treeverse/clients/GarbageCollector.scala +++ b/clients/spark/core/src/main/scala/io/treeverse/clients/GarbageCollector.scala @@ -114,7 +114,6 @@ object GarbageCollector { SSTableReader .forRange(new Configuration(), location) .newIterator() - .filter(!_.message.addressType.isRelative) .filter(x =>set.contains(x.message.address)) .map(a => new String(a.message.address), From 02ab338942215892c72deff80ea2bacd3cd454b3 Mon Sep 17 00:00:00 2001 From: guyhardonag Date: Mon, 5 Jul 2021 19:26:07 +0300 Subject: [PATCH 3/6] Format and clean imports --- .../treeverse/clients/GarbageCollector.scala | 52 +++++++++++-------- 1 file changed, 31 insertions(+), 21 deletions(-) diff --git a/clients/spark/core/src/main/scala/io/treeverse/clients/GarbageCollector.scala b/clients/spark/core/src/main/scala/io/treeverse/clients/GarbageCollector.scala index 299af9c5b47..aa7603f2132 100644 --- a/clients/spark/core/src/main/scala/io/treeverse/clients/GarbageCollector.scala +++ b/clients/spark/core/src/main/scala/io/treeverse/clients/GarbageCollector.scala @@ -1,10 +1,12 @@ package io.treeverse.clients import com.google.protobuf.timestamp.Timestamp -import io.treeverse.clients.LakeFSContext.{LAKEFS_CONF_API_ACCESS_KEY_KEY, LAKEFS_CONF_API_SECRET_KEY_KEY, LAKEFS_CONF_API_URL_KEY} +import io.treeverse.clients.LakeFSContext.{ + LAKEFS_CONF_API_ACCESS_KEY_KEY, + LAKEFS_CONF_API_SECRET_KEY_KEY, + LAKEFS_CONF_API_URL_KEY +} import org.apache.hadoop.conf.Configuration -import org.apache.spark -import org.apache.spark.rdd.RDD import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.{SparkSession, _} @@ -103,21 +105,19 @@ object GarbageCollector { } def getEntryAddressesIfInSet( - rangeID: String, - conf: APIConfigurations, - repo: String, - set:Set[String] - ): Set[String] = { + rangeID: String, + conf: APIConfigurations, + repo: String, + set: Set[String] + ): Set[String] = { val location = new ApiClient(conf.apiURL, conf.accessKey, conf.secretKey).getRangeURL(repo, rangeID) SSTableReader .forRange(new Configuration(), location) .newIterator() - .filter(x =>set.contains(x.message.address)) - .map(a => - new String(a.message.address), - ) + .filter(x => set.contains(x.message.address)) + .map(a => new String(a.message.address)) .toSet } @@ -235,16 +235,25 @@ object GarbageCollector { subtractDeduplications(expired, activeRangesDF, conf, repo, spark) } - private def subtractDeduplications(expired: Dataset[Row], activeRangesDF: Dataset[Row], conf: APIConfigurations, repo: String, spark: SparkSession) : Dataset[Row]= { + private def subtractDeduplications( + expired: Dataset[Row], + activeRangesDF: Dataset[Row], + conf: APIConfigurations, + repo: String, + spark: SparkSession + ): Dataset[Row] = { val expiredAddr: Set[String] = expired.select("address").collect().map(_.getString(0)).toSet - val ranges : Seq[String] = activeRangesDF.select("range_id").collect().map(_.getString(0)).toSeq.distinct + val ranges: Seq[String] = + activeRangesDF.select("range_id").collect().map(_.getString(0)).toSeq.distinct val rangesRDD = spark.sparkContext.parallelize(ranges) - val activeAddresses =rangesRDD.flatMap(range=> { - getEntryAddressesIfInSet(range, conf, repo, expiredAddr) - }).map(x => Row(x)) + val activeAddresses = rangesRDD + .flatMap(range => { + getEntryAddressesIfInSet(range, conf, repo, expiredAddr) + }) + .map(x => Row(x)) - val schema = new StructType().add(StructField("address",StringType, true)) + val schema = new StructType().add(StructField("address", StringType, true)) val activeDF = spark.createDataFrame(activeAddresses, schema) expired.join( activeDF, @@ -264,7 +273,8 @@ object GarbageCollector { val repo = args(0) val region = args(1) - val previousRunID = "" //args(2) // TODO(Guys): get previous runID from arguments or from storage + val previousRunID = + "" //args(2) // TODO(Guys): get previous runID from arguments or from storage val hc = spark.sparkContext.hadoopConfiguration val apiURL = hc.get(LAKEFS_CONF_API_URL_KEY) val accessKey = hc.get(LAKEFS_CONF_API_ACCESS_KEY_KEY) @@ -274,7 +284,7 @@ object GarbageCollector { val runID = res.getRunId val gcCommitsLocation = ApiClient.translateS3(new URI(res.getGcCommitsLocation)).toString - val gcAddressesLocation =ApiClient.translateS3(new URI(res.getGcAddressesLocation)).toString + val gcAddressesLocation = ApiClient.translateS3(new URI(res.getGcAddressesLocation)).toString val expiredAddresses = getExpiredAddresses(repo, runID, @@ -286,7 +296,7 @@ object GarbageCollector { .partitionBy("run_id") .mode(SaveMode.Overwrite) .parquet(gcAddressesLocation) - S3BulkDeleter.remove(repo, gcAddressesLocation,runID, region, spark) + S3BulkDeleter.remove(repo, gcAddressesLocation, runID, region, spark) } } From 1ab38d77644006dc942e170662ed769cf2328a8c Mon Sep 17 00:00:00 2001 From: guyhardonag Date: Tue, 6 Jul 2021 08:45:50 +0300 Subject: [PATCH 4/6] Fix due to wrong bad merge in rebase --- .../treeverse/clients/GarbageCollector.scala | 57 ++++++++----------- 1 file changed, 25 insertions(+), 32 deletions(-) diff --git a/clients/spark/core/src/main/scala/io/treeverse/clients/GarbageCollector.scala b/clients/spark/core/src/main/scala/io/treeverse/clients/GarbageCollector.scala index aa7603f2132..e05b69bf5d8 100644 --- a/clients/spark/core/src/main/scala/io/treeverse/clients/GarbageCollector.scala +++ b/clients/spark/core/src/main/scala/io/treeverse/clients/GarbageCollector.scala @@ -7,6 +7,7 @@ import io.treeverse.clients.LakeFSContext.{ LAKEFS_CONF_API_URL_KEY } import org.apache.hadoop.conf.Configuration +import org.apache.spark.rdd.RDD import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.{SparkSession, _} @@ -69,14 +70,19 @@ object GarbageCollector { .distinct } - def getRangeAddresses(rangeID: String, conf: APIConfigurations, repo: String): Set[String] = { + def getRangeAddresses( + rangeID: String, + conf: APIConfigurations, + repo: String, + spark: SparkSession + ): Seq[String] = { val location = new ApiClient(conf.apiURL, conf.accessKey, conf.secretKey).getRangeURL(repo, rangeID) SSTableReader .forRange(new Configuration(), location) .newIterator() .map(a => new String(a.key)) - .toSet + .toSeq } def getEntryTuples( @@ -104,23 +110,6 @@ object GarbageCollector { .toSet } - def getEntryAddressesIfInSet( - rangeID: String, - conf: APIConfigurations, - repo: String, - set: Set[String] - ): Set[String] = { - - val location = - new ApiClient(conf.apiURL, conf.accessKey, conf.secretKey).getRangeURL(repo, rangeID) - SSTableReader - .forRange(new Configuration(), location) - .newIterator() - .filter(x => set.contains(x.message.address)) - .map(a => new String(a.message.address)) - .toSet - } - /** @param leftRangeIDs * @param rightRangeIDs * @param conf @@ -242,19 +231,17 @@ object GarbageCollector { repo: String, spark: SparkSession ): Dataset[Row] = { - val expiredAddr: Set[String] = expired.select("address").collect().map(_.getString(0)).toSet - val ranges: Seq[String] = - activeRangesDF.select("range_id").collect().map(_.getString(0)).toSeq.distinct - val rangesRDD = spark.sparkContext.parallelize(ranges) - - val activeAddresses = rangesRDD + val activeRangesRDD: RDD[String] = + activeRangesDF.select("range_id").rdd.distinct().map(x => x.getString(0)) + val activeAddresses: RDD[String] = activeRangesRDD .flatMap(range => { - getEntryAddressesIfInSet(range, conf, repo, expiredAddr) + getRangeAddresses(range, conf, repo, spark) }) - .map(x => Row(x)) - + .distinct() + val activeAddressesRows: RDD[Row] = activeAddresses.map(x => Row(x)) val schema = new StructType().add(StructField("address", StringType, true)) - val activeDF = spark.createDataFrame(activeAddresses, schema) + val activeDF = spark.createDataFrame(activeAddressesRows, schema) + // remove active addresses from delete candidates expired.join( activeDF, expired("address") === activeDF("address"), @@ -354,7 +341,13 @@ object S3BulkDeleter { }) } - def remove(repo: String, location: String, runID: String, region: String, spark: SparkSession) = { + def remove( + repo: String, + addressDFLocation: String, + runID: String, + region: String, + spark: SparkSession + ) = { val MaxBulkSize = 1000 val awsRetries = 1000 @@ -371,7 +364,7 @@ object S3BulkDeleter { if (addSuffixSlash.startsWith("/")) addSuffixSlash.substring(1) else addSuffixSlash val df = spark.read - .parquet(location) + .parquet(addressDFLocation) .where(col("run_id") === runID) .where(col("relative") === true) val res = @@ -381,7 +374,7 @@ object S3BulkDeleter { } def main(args: Array[String]): Unit = { - if (args.length != 5) { + if (args.length != 4) { Console.err.println( "Usage: ... s3://storageNamespace/prepared_addresses_table" ) From daaea3ab38982f0584b8d7da1c8852a02c625ad9 Mon Sep 17 00:00:00 2001 From: guyhardonag Date: Tue, 6 Jul 2021 10:28:51 +0300 Subject: [PATCH 5/6] Bug fix --- .../src/main/scala/io/treeverse/clients/GarbageCollector.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/clients/spark/core/src/main/scala/io/treeverse/clients/GarbageCollector.scala b/clients/spark/core/src/main/scala/io/treeverse/clients/GarbageCollector.scala index e05b69bf5d8..bd0da9c2cf7 100644 --- a/clients/spark/core/src/main/scala/io/treeverse/clients/GarbageCollector.scala +++ b/clients/spark/core/src/main/scala/io/treeverse/clients/GarbageCollector.scala @@ -81,7 +81,7 @@ object GarbageCollector { SSTableReader .forRange(new Configuration(), location) .newIterator() - .map(a => new String(a.key)) + .map(a => new String(a.message.address)) .toSeq } @@ -279,6 +279,7 @@ object GarbageCollector { spark, APIConfigurations(apiURL, accessKey, secretKey) ).withColumn("run_id", lit(runID)) + spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic") expiredAddresses.write .partitionBy("run_id") .mode(SaveMode.Overwrite) From 122a269bd2aaefec622b15e979f9a55b1e36009a Mon Sep 17 00:00:00 2001 From: guyhardonag Date: Tue, 6 Jul 2021 11:21:45 +0300 Subject: [PATCH 6/6] Code review changes --- .../main/scala/io/treeverse/clients/ApiClient.scala | 2 +- .../io/treeverse/clients/GarbageCollector.scala | 13 ++++++------- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/clients/spark/core/src/main/scala/io/treeverse/clients/ApiClient.scala b/clients/spark/core/src/main/scala/io/treeverse/clients/ApiClient.scala index 15b2abf96fd..5139b2a93d2 100644 --- a/clients/spark/core/src/main/scala/io/treeverse/clients/ApiClient.scala +++ b/clients/spark/core/src/main/scala/io/treeverse/clients/ApiClient.scala @@ -72,7 +72,7 @@ class ApiClient(apiUrl: String, accessKey: String, secretKey: String) { if (metaRangeID != "") { val metaRange = metadataApi.getMetaRange(repoName, metaRangeID) val location = metaRange.getLocation - URI.create(getStorageNamespace(repoName) + "/" + location).normalize().toString + URI.create(getStorageNamespace(repoName) + "/").resolve(location).normalize().toString } else "" } diff --git a/clients/spark/core/src/main/scala/io/treeverse/clients/GarbageCollector.scala b/clients/spark/core/src/main/scala/io/treeverse/clients/GarbageCollector.scala index bd0da9c2cf7..cf374368ab6 100644 --- a/clients/spark/core/src/main/scala/io/treeverse/clients/GarbageCollector.scala +++ b/clients/spark/core/src/main/scala/io/treeverse/clients/GarbageCollector.scala @@ -39,7 +39,8 @@ object GarbageCollector { ): Set[(String, Array[Byte], Array[Byte])] = { val location = new ApiClient(conf.apiURL, conf.accessKey, conf.secretKey).getMetaRangeURL(repo, commitID) - if (location == "") Set[(String, Array[Byte], Array[Byte])]() + // continue on empty location, empty location is a result of a commit with no metaRangeID (e.g 'Repository created' commit) + if (location == "") Set() else SSTableReader .forMetaRange(new Configuration(), location) @@ -73,15 +74,14 @@ object GarbageCollector { def getRangeAddresses( rangeID: String, conf: APIConfigurations, - repo: String, - spark: SparkSession + repo: String ): Seq[String] = { val location = new ApiClient(conf.apiURL, conf.accessKey, conf.secretKey).getRangeURL(repo, rangeID) SSTableReader .forRange(new Configuration(), location) .newIterator() - .map(a => new String(a.message.address)) + .map(a => a.message.address) .toSeq } @@ -219,7 +219,6 @@ object GarbageCollector { val rangesDF = getRangesDFFromCommits(commitsDF, repo, conf) val expired = getExpiredEntriesFromRanges(rangesDF, conf, repo) - // validate deduplications val activeRangesDF = rangesDF.where("!expired") subtractDeduplications(expired, activeRangesDF, conf, repo, spark) } @@ -235,7 +234,7 @@ object GarbageCollector { activeRangesDF.select("range_id").rdd.distinct().map(x => x.getString(0)) val activeAddresses: RDD[String] = activeRangesRDD .flatMap(range => { - getRangeAddresses(range, conf, repo, spark) + getRangeAddresses(range, conf, repo) }) .distinct() val activeAddressesRows: RDD[Row] = activeAddresses.map(x => Row(x)) @@ -279,7 +278,7 @@ object GarbageCollector { spark, APIConfigurations(apiURL, accessKey, secretKey) ).withColumn("run_id", lit(runID)) - spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic") + spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic") expiredAddresses.write .partitionBy("run_id") .mode(SaveMode.Overwrite)