-
Notifications
You must be signed in to change notification settings - Fork 348
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GC combine to one command and handle deduplications #2196
Conversation
clients/spark/core/src/main/scala/io/treeverse/clients/GarbageCollector.scala
Outdated
Show resolved
Hide resolved
Console.err.println( | ||
"Usage: ... <repo_name> <runID> s3://storageNamespace/prepared_commits_table s3://storageNamespace/output_destination_table" | ||
"Usage: ... <repo_name> <region>" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
About the region - can we use withForceGlobalBucketAccessEnabled to start the client without the region?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thats for the previous SDK version. seems like they didn't solve it yet for the current version.
aws/aws-sdk-java-v2#2229
aws/aws-sdk-java-v2#52
val addressesDFLocation = args(3) | ||
|
||
val region = args(1) | ||
val previousRunID = "" //args(2) // TODO(Guys): get previous runID from arguments or from storage |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we're letting go of the previous run feature for now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, It will be the first thing to add after the release
clients/spark/core/src/main/scala/io/treeverse/clients/GarbageCollector.scala
Outdated
Show resolved
Hide resolved
clients/spark/core/src/main/scala/io/treeverse/clients/GarbageCollector.scala
Outdated
Show resolved
Hide resolved
} | ||
|
||
private def subtractDeduplications(expired: Dataset[Row], activeRangesDF: Dataset[Row], conf: APIConfigurations, repo: String, spark: SparkSession) : Dataset[Row]= { | ||
val expiredAddr: Set[String] = expired.select("address").collect().map(_.getString(0)).toSet |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val expiredAddr: Set[String] = expired.select("address").collect().map(_.getString(0)).toSet | |
val deleteCandidateAddresses: Set[String] = expired.select("address").collect().map(_.getString(0)).toSet |
clients/spark/core/src/main/scala/io/treeverse/clients/GarbageCollector.scala
Outdated
Show resolved
Hide resolved
val ranges : Seq[String] = activeRangesDF.select("range_id").collect().map(_.getString(0)).toSeq.distinct | ||
val rangesRDD = spark.sparkContext.parallelize(ranges) | ||
|
||
val activeAddresses =rangesRDD.flatMap(range=> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val activeAddresses =rangesRDD.flatMap(range=> { | |
val activeDeleteCandidateAddresses = activeRangeRDD.flatMap(range => { |
clients/spark/core/src/main/scala/io/treeverse/clients/GarbageCollector.scala
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure about the code that selects things, IIUC it will not let Spark do its thing and parallelize the run properly :-(
val location = | ||
new ApiClient(conf.apiURL, conf.accessKey, conf.secretKey).getRangeURL(repo, rangeID) | ||
SSTableReader | ||
.forRange(new Configuration(), location) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this truly the configuration we want here?
clients/spark/core/src/main/scala/io/treeverse/clients/GarbageCollector.scala
Outdated
Show resolved
Hide resolved
|
||
private def subtractDeduplications(expired: Dataset[Row], activeRangesDF: Dataset[Row], conf: APIConfigurations, repo: String, spark: SparkSession) : Dataset[Row]= { | ||
val expiredAddr: Set[String] = expired.select("address").collect().map(_.getString(0)).toSet | ||
val ranges : Seq[String] = activeRangesDF.select("range_id").collect().map(_.getString(0)).toSeq.distinct |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I understand: doesn't collect
directly move everything to the driver program and destroy parallelism? At the very least, I would run distinct
or its equivalent on the cluster.
val activeAddresses =rangesRDD.flatMap(range=> { | ||
getEntryAddressesIfInSet(range, conf, repo, expiredAddr) | ||
}).map(x => Row(x)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like a flatMap with side data. I think that it will be somewhat tricky, because expiredAddr
needs to be broadcast to all the workers. It looks like it is some kind of JOIN, so I would prefer if you could write ranges
as an RDD (or any other Spark table, I don't really know the difference...) and join with that here.
9c19252
to
02ab338
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
Please add a comment on l. 42 about what an empty location means.
if (metaRangeID != "") { | ||
val metaRange = metadataApi.getMetaRange(repoName, metaRangeID) | ||
val location = metaRange.getLocation | ||
URI.create(getStorageNamespace(repoName) + "/" + location).normalize().toString |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As you probably know, I am not a fan of of constructing URIs by hand and prefer resolve
: that is specified by a standard. The issue here is sensitivity to trailing and leading slashes.
(Not a blocker)
clients/spark/core/src/main/scala/io/treeverse/clients/GarbageCollector.scala
Outdated
Show resolved
Hide resolved
clients/spark/core/src/main/scala/io/treeverse/clients/GarbageCollector.scala
Outdated
Show resolved
Hide resolved
(new String(range.id), range.message.minKey.toByteArray, range.message.maxKey.toByteArray) | ||
) | ||
.toSet | ||
if (location == "") Set[(String, Array[Byte], Array[Byte])]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand: when is location == ""
a valid return? If it has special meaning, please add a comment to document that meaning.
@arielshaqed, Thanks for the review, PTAL |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool! Thanks...
No description provided.