Skip to content

Commit

Permalink
[CELEBORN-1642][CIP-11] Support multiple worker tags
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Current TagsManager code only supported one tags for selecting tagged workers. This change will enable support of passing multiple tags to TagsManager. Multiple tags will be evaluated as "AND" expression i.e only workers tagged with all the passed tags will be selected.

Support for more schemes will be added in follow up PRs.

### Why are the changes needed?
https://cwiki.apache.org/confluence/display/CELEBORN/CIP-11+Supporting+Tags+in+Celeborn

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added UTs

Closes #2850 from s0nskar/CELEBORN-1642.

Authored-by: Sanskar Modi <sanskarmodi97@gmail.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
  • Loading branch information
s0nskar authored and FMX committed Oct 28, 2024
1 parent 7685fa7 commit e51b0c4
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.celeborn.service.deploy.master.tags

import java.util
import java.util.{Set => JSet}
import java.util.{Collections, Set => JSet}
import java.util.concurrent.ConcurrentHashMap
import java.util.function.Predicate
import java.util.stream.Collectors
Expand All @@ -43,19 +43,27 @@ class TagsManager extends Logging {

if (tags.isEmpty) {
logWarning("No tags provided")
return new util.ArrayList[WorkerInfo]()
return Collections.emptyList()
}

// TODO: Support multiple tags (CELEBORN-1642)
val tag = tags(0)
val workersForTag = tagStore.get(tag)
if (workersForTag == null) {
logWarning(s"Tag $tag not found in cluster")
return new util.ArrayList[WorkerInfo]()
var workersForTags: Option[JSet[String]] = None
tags.foreach { tag =>
val taggedWorkers = tagStore.getOrDefault(tag, Collections.emptySet())
workersForTags match {
case Some(w) =>
w.retainAll(taggedWorkers)
case _ =>
workersForTags = Some(taggedWorkers)
}
}

if (workersForTags.isEmpty) {
logWarning(s"No workers for tags: $tagExpr found in cluster")
return Collections.emptyList()
}

val workerTagsPredicate = new Predicate[WorkerInfo] {
override def test(w: WorkerInfo): Boolean = workersForTag.contains(w.toUniqueId())
override def test(w: WorkerInfo): Boolean = workersForTags.get.contains(w.toUniqueId())
}
workers.stream().filter(workerTagsPredicate).collect(Collectors.toList())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,4 +122,31 @@ class TagsManagerSuite extends AnyFunSuite
assert(tags.contains(TAG2))
}
}

test("test tags expression with multiple tags") {
tagsManager = new TagsManager()

// Tag1
val TAG1 = "tag1"
tagsManager.addTagToWorker(TAG1, WORKER1.toUniqueId())
tagsManager.addTagToWorker(TAG1, WORKER2.toUniqueId())

// Tag2
val TAG2 = "tag2"
tagsManager.addTagToWorker(TAG2, WORKER2.toUniqueId())
tagsManager.addTagToWorker(TAG2, WORKER3.toUniqueId())

{
val taggedWorkers = tagsManager.getTaggedWorkers("tag1,tag2", workers)
assert(taggedWorkers.size == 1)
assert(!taggedWorkers.contains(WORKER1))
assert(taggedWorkers.contains(WORKER2))
assert(!taggedWorkers.contains(WORKER3))
}

{
val taggedWorkers = tagsManager.getTaggedWorkers("tag1,tag3", workers)
assert(taggedWorkers.size == 0)
}
}
}

0 comments on commit e51b0c4

Please sign in to comment.