Skip to content

Commit

Permalink
Format and clean imports
Browse files Browse the repository at this point in the history
  • Loading branch information
guy-har committed Jul 5, 2021
1 parent 316a910 commit 02ab338
Showing 1 changed file with 31 additions and 21 deletions.
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package io.treeverse.clients

import com.google.protobuf.timestamp.Timestamp
import io.treeverse.clients.LakeFSContext.{LAKEFS_CONF_API_ACCESS_KEY_KEY, LAKEFS_CONF_API_SECRET_KEY_KEY, LAKEFS_CONF_API_URL_KEY}
import io.treeverse.clients.LakeFSContext.{
LAKEFS_CONF_API_ACCESS_KEY_KEY,
LAKEFS_CONF_API_SECRET_KEY_KEY,
LAKEFS_CONF_API_URL_KEY
}
import org.apache.hadoop.conf.Configuration
import org.apache.spark
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{SparkSession, _}
Expand Down Expand Up @@ -103,21 +105,19 @@ object GarbageCollector {
}

def getEntryAddressesIfInSet(
rangeID: String,
conf: APIConfigurations,
repo: String,
set:Set[String]
): Set[String] = {
rangeID: String,
conf: APIConfigurations,
repo: String,
set: Set[String]
): Set[String] = {

val location =
new ApiClient(conf.apiURL, conf.accessKey, conf.secretKey).getRangeURL(repo, rangeID)
SSTableReader
.forRange(new Configuration(), location)
.newIterator()
.filter(x =>set.contains(x.message.address))
.map(a =>
new String(a.message.address),
)
.filter(x => set.contains(x.message.address))
.map(a => new String(a.message.address))
.toSet
}

Expand Down Expand Up @@ -235,16 +235,25 @@ object GarbageCollector {
subtractDeduplications(expired, activeRangesDF, conf, repo, spark)
}

private def subtractDeduplications(expired: Dataset[Row], activeRangesDF: Dataset[Row], conf: APIConfigurations, repo: String, spark: SparkSession) : Dataset[Row]= {
private def subtractDeduplications(
expired: Dataset[Row],
activeRangesDF: Dataset[Row],
conf: APIConfigurations,
repo: String,
spark: SparkSession
): Dataset[Row] = {
val expiredAddr: Set[String] = expired.select("address").collect().map(_.getString(0)).toSet
val ranges : Seq[String] = activeRangesDF.select("range_id").collect().map(_.getString(0)).toSeq.distinct
val ranges: Seq[String] =
activeRangesDF.select("range_id").collect().map(_.getString(0)).toSeq.distinct
val rangesRDD = spark.sparkContext.parallelize(ranges)

val activeAddresses =rangesRDD.flatMap(range=> {
getEntryAddressesIfInSet(range, conf, repo, expiredAddr)
}).map(x => Row(x))
val activeAddresses = rangesRDD
.flatMap(range => {
getEntryAddressesIfInSet(range, conf, repo, expiredAddr)
})
.map(x => Row(x))

val schema = new StructType().add(StructField("address",StringType, true))
val schema = new StructType().add(StructField("address", StringType, true))
val activeDF = spark.createDataFrame(activeAddresses, schema)
expired.join(
activeDF,
Expand All @@ -264,7 +273,8 @@ object GarbageCollector {

val repo = args(0)
val region = args(1)
val previousRunID = "" //args(2) // TODO(Guys): get previous runID from arguments or from storage
val previousRunID =
"" //args(2) // TODO(Guys): get previous runID from arguments or from storage
val hc = spark.sparkContext.hadoopConfiguration
val apiURL = hc.get(LAKEFS_CONF_API_URL_KEY)
val accessKey = hc.get(LAKEFS_CONF_API_ACCESS_KEY_KEY)
Expand All @@ -274,7 +284,7 @@ object GarbageCollector {
val runID = res.getRunId

val gcCommitsLocation = ApiClient.translateS3(new URI(res.getGcCommitsLocation)).toString
val gcAddressesLocation =ApiClient.translateS3(new URI(res.getGcAddressesLocation)).toString
val gcAddressesLocation = ApiClient.translateS3(new URI(res.getGcAddressesLocation)).toString

val expiredAddresses = getExpiredAddresses(repo,
runID,
Expand All @@ -286,7 +296,7 @@ object GarbageCollector {
.partitionBy("run_id")
.mode(SaveMode.Overwrite)
.parquet(gcAddressesLocation)
S3BulkDeleter.remove(repo, gcAddressesLocation,runID, region, spark)
S3BulkDeleter.remove(repo, gcAddressesLocation, runID, region, spark)
}
}

Expand Down

0 comments on commit 02ab338

Please sign in to comment.