Skip to content
This repository has been archived by the owner on Apr 24, 2023. It is now read-only.

Commit

Permalink
Parallel watch processing. (#2169)
Browse files Browse the repository at this point in the history
* Parallel watch processing.

* Bug fix.

* Bugfix.

* Make the shard count and max threads a parameter.

Co-authored-by: Scott Crosby <scrosby@twosigma.com>
  • Loading branch information
scrosby and Scott Crosby authored Sep 6, 2022
1 parent 86f1581 commit 637b734
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 13 deletions.
131 changes: 131 additions & 0 deletions scheduler/java/com/twosigma/cook/kubernetes/ParallelWatchQueue.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package com.twosigma.cook.kubernetes;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* Implement a processor that can process watch events from k8s in parallel.
* Events are respresented as Runnable's (presmuably as clojures over some
* underlying event state). Events on the same shard are processed sequentially.
* Events on different shards are processed in parallel on the supplied
* ExecutorService. Backpressure is implemented. When there are too many
* outstanding events, submitting an event will block.
*/
public class ParallelWatchQueue {
/**
* Shard array
*/
final ArrayList<Shard> shards;
/**
* Implements backpressure. Semaphore is grabbed whenever queueing.
*/
final Semaphore queueSlotsLeft;
final ExecutorService executor;
final int shardCount;

/**
* Create a processor that can process watch events concurrently. It
* guarantees that events on the same shard are processed in a linear
* order. This code is multithread safe and can
*
* @param executor The executor that has runnables submitted.
* @param maxOutstanding How many events should we have queued up
* before we backpressure and block submitting watch events.
* @param maxShards how many shards to use.
*/
public ParallelWatchQueue(ExecutorService executor, int maxOutstanding, int maxShards) {
this.executor = executor;
// Immutable after construction.
this.shards = new ArrayList<>(maxShards);
this.shardCount = maxShards;
this.queueSlotsLeft = new Semaphore(maxOutstanding, true);
for (int ii = 0; ii < maxShards; ii++) {
shards.add(new Shard());
}
}

/**
* Submit a given watch event, represented as a Runnable, on the given
* shard number.
* We guarantee that processing. is done asynchronously on a thread
* such that events submitted in a happensBefore order on the same
* shard number are processed in that same order.
*
* @param event The event operation.
* @param shardNum The shard number it is submitted on.
* @throws InterruptedException
*/
public void submitEvent(Runnable event, int shardNum) throws InterruptedException {
// Block if we are backpressuring.
queueSlotsLeft.acquire();
shards.get(shardNum).submitEvent(event);
}

/**
* Number of shards the workload is divided between.
* @return shard count.
*/
public int getShardCount() {
return shardCount;
}

/**
* Implements a single queue shard.
*/
private class Shard {
/* Used to lock adding events or processing events from a shard */
private final Lock lock = new ReentrantLock();
/**
* Implements the actual queue of outstanding events
*/
private final Deque<Runnable> deque = new ArrayDeque<Runnable>();

/**
* Process all queued events in this shard. This can be called
* from multiple threads. It internally locks, allowing only one
* thread to actually process this shard at a time.
*/
void processDeque() {
lock.lock();
try {
while (!deque.isEmpty()) {
var event = deque.removeFirst();
// Mark event as dequeed so we release any backpressure.
queueSlotsLeft.release();
executor.submit(event);
}
} finally {
lock.unlock();
}
}

/**
* Submit an event.
* <p>
* If two events are submitted with a happensBefore relationship the locks guarantee
* that the events will be queued sequentially in the the queue.
* <p>
* Because processQueue locks on the same lock. Any events currently being processed
* on this shard will be processed before the new event is submitted.
* <p>
* We have an invariant that whenever a deque has an event on it, there is at least
* one runnable in the executorservice that will process this shard.
*
* @param event The runnable to run.
*/
void submitEvent(Runnable event) {
lock.lock();
try {
deque.addLast(event);
} finally {
lock.unlock();
}
executor.submit(this::processDeque);
}
}
}
32 changes: 23 additions & 9 deletions scheduler/src/cook/kubernetes/compute_cluster.clj
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
[metrics.timers :as timers]
[opentracing-clj.core :as tracing])
(:import (com.google.auth.oauth2 GoogleCredentials)
(com.twosigma.cook.kubernetes TokenRefreshingAuthenticator)
(com.twosigma.cook.kubernetes TokenRefreshingAuthenticator ParallelWatchQueue)
(io.kubernetes.client.openapi ApiClient)
(io.kubernetes.client.openapi.models V1Node V1Pod)
(io.kubernetes.client.util ClientBuilder KubeConfig)
Expand Down Expand Up @@ -214,15 +214,23 @@

(defn make-cook-pod-watch-callback
"Make a callback function that is passed to the pod-watch callback. This callback forwards changes to the cook.kubernetes.controller."
[kcc]
[{:keys [^ParallelWatchQueue parallel-watch-queue] :as kcc}]
(fn pod-watch-callback
[_ prev-pod pod]
[_ ^V1Pod prev-pod ^V1Pod pod]
(try
(if (nil? pod)
(controller/pod-deleted kcc prev-pod)
(controller/pod-update kcc pod))
(let [name (or (some-> prev-pod .getMetadata .getName)
(some-> pod .getMetadata .getName))
shardNum (mod (.hashCode name) (.getShardCount parallel-watch-queue))
^Runnable event (fn []
(try
(if (nil? pod)
(controller/pod-deleted kcc prev-pod)
(controller/pod-update kcc pod))
(catch Exception e
(log/error e "Error processing status update on" name))))]
(.submitEvent parallel-watch-queue event shardNum))
(catch Exception e
(log/error e "Error processing status update")))))
(log/error e "Error submitting pod status update")))))

(defn task-ents->map-by-task-id
"Given seq of task entities from datomic, generate a map of task-id -> entity."
Expand Down Expand Up @@ -408,7 +416,8 @@
compute-cluster-launch-rate-limiter cook-pool-taint-name cook-pool-taint-prefix
cook-pool-taint2-name cook-pool-taint2-value
cook-pool-label-name cook-pool-label-prefix
controller-lock-objects kill-lock-object]
controller-lock-objects kill-lock-object
parallel-watch-queue]
cc/ComputeCluster
(launch-tasks [this pool-name matches process-task-post-launch-fn]
(let [task-metadata-seq (mapcat :task-metadata-seq matches)]
Expand Down Expand Up @@ -886,6 +895,8 @@
name
namespace
node-blocklist-labels
parallel-watch-max-outstanding
parallel-watch-shards
read-timeout-seconds
scan-frequency-seconds
state
Expand All @@ -906,6 +917,8 @@
state :running
state-locked? false
use-google-service-account? true
parallel-watch-max-outstanding 1000
parallel-watch-shards 200
cook-pool-taint-prefix ""
cook-pool-label-prefix ""
use-token-refreshing-authenticator? false}
Expand Down Expand Up @@ -963,6 +976,7 @@
(with-meta (vec (repeatedly lock-shard-count #(ReentrantLock.)))
{:json-value (str "<count of " lock-shard-count " ReentrantLocks>")})
; cluster-level kill-lock. See cc/kill-lock-object
(ReentrantReadWriteLock. true))]
(ReentrantReadWriteLock. true)
(ParallelWatchQueue. controller-executor-service parallel-watch-max-outstanding parallel-watch-shards))]
(cc/register-compute-cluster! compute-cluster)
compute-cluster))
4 changes: 3 additions & 1 deletion scheduler/src/cook/test/testutil.clj
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
[ring.middleware.params :refer [wrap-params]])
(:import (com.google.common.cache CacheBuilder)
(com.netflix.fenzo SimpleAssignmentResult)
(com.twosigma.cook.kubernetes ParallelWatchQueue)
(io.kubernetes.client.custom Quantity Quantity$Format)
(io.kubernetes.client.openapi.models V1Container V1Node V1NodeSpec V1NodeStatus V1ObjectMeta V1Pod V1PodSpec V1ResourceRequirements V1Taint)
(java.util.concurrent Executors TimeUnit)
Expand Down Expand Up @@ -661,4 +662,5 @@
"some-random-label-A"
"some-random-label-val-B"
(repeatedly 16 #(ReentrantLock.))
(ReentrantReadWriteLock. true))))
(ReentrantReadWriteLock. true)
(ParallelWatchQueue. (Executors/newSingleThreadExecutor) 1000 100))))
10 changes: 7 additions & 3 deletions scheduler/test/cook/test/kubernetes/compute_cluster.clj
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
[cook.test.testutil :as tu]
[datomic.api :as d])
(:import (clojure.lang ExceptionInfo)
(com.twosigma.cook.kubernetes ParallelWatchQueue)
(io.kubernetes.client.openapi.models V1NodeSelectorRequirement V1Pod V1PodSecurityContext)
(java.util.concurrent Executors)
(java.util.concurrent.locks ReentrantLock ReentrantReadWriteLock)
Expand Down Expand Up @@ -80,7 +81,8 @@
{} (atom :running) (atom false) false
cook.rate-limit/AllowAllRateLimiter "t-a" "p-a" "t2-a" "p2-a" "l-p" "l-v1"
(repeatedly 16 #(ReentrantLock.))
(ReentrantReadWriteLock. true))
(ReentrantReadWriteLock. true)
(ParallelWatchQueue. (Executors/newSingleThreadExecutor) 1000 10))
task-metadata (task/TaskAssignmentResult->task-metadata (d/db conn)
nil
compute-cluster
Expand All @@ -103,7 +105,8 @@
{} (atom :running) (atom false) false
cook.rate-limit/AllowAllRateLimiter "t-b" "p-b" "t2-a" "p2-a" "l-p" "l-v2"
(repeatedly 16 #(ReentrantLock.))
(ReentrantReadWriteLock. true))
(ReentrantReadWriteLock. true)
(ParallelWatchQueue. (Executors/newSingleThreadExecutor) 1000 10))
task-metadata (task/TaskAssignmentResult->task-metadata (d/db conn)
nil
compute-cluster
Expand Down Expand Up @@ -134,7 +137,8 @@
{} (atom :running) (atom false) false
cook.rate-limit/AllowAllRateLimiter "t-c" "p-c" "t2-a" "p2-a" "l-p" "l-c2"
(repeatedly 16 #(ReentrantLock.))
(ReentrantReadWriteLock. true))
(ReentrantReadWriteLock. true)
(ParallelWatchQueue. (Executors/newSingleThreadExecutor) 1000 10))
node-name->node {"nodeA" (tu/node-helper "nodeA" 1.0 1000.0 10 "nvidia-tesla-p100" nil nil)
"nodeB" (tu/node-helper "nodeB" 1.0 1000.0 25 "nvidia-tesla-p100" nil nil)
"nodeC" (tu/node-helper "nodeC" 1.0 1000.0 nil nil nil nil)
Expand Down

0 comments on commit 637b734

Please sign in to comment.