Skip to content

Commit

Permalink
Move the postponing-invalidation logic from WorkerLifecycleManager to…
Browse files Browse the repository at this point in the history
… WorkerPoolImpl.

All idle workers in WokerPoolImpl can be killed, so the postponing validation logic should be placed inside the WorkerPool instead of WorkerLifecycleManager.

PiperOrigin-RevId: 717532651
Change-Id: I51fbe92350b18056fc35c1953f01641879ef8b84
  • Loading branch information
bigelephant29 authored and copybara-github committed Jan 20, 2025
1 parent 4ee1788 commit 01cb3cd
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 162 deletions.
1 change: 1 addition & 0 deletions src/main/java/com/google/devtools/build/lib/worker/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ java_library(
":worker",
":worker_factory",
":worker_key",
":worker_options",
":worker_pool",
":worker_pool_config",
":worker_process_status",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.google.common.flogger.GoogleLogger;
import com.google.devtools.build.lib.events.Event;
import com.google.devtools.build.lib.events.Reporter;
import com.google.devtools.build.lib.worker.WorkerProcessStatus.Status;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -215,51 +214,6 @@ void evictWorkers(ImmutableList<WorkerProcessMetrics> workerProcessMetrics)

emptyEvictionWasLogged = candidates.isEmpty();
}

// TODO(b/300067854): Shrinking of the worker pool happens on worker keys that are active at the
// time of polling, but doesn't shrink the pools of idle workers. We might be wrongly
// penalizing lower memory usage workers (but more active) by shrinking their pool sizes
// instead of higher memory usage workers (but less active) and are killed directly with
// {@code #evictCandidates()} (where shrinking doesn't happen).
if (options.shrinkWorkerPool) {
List<WorkerProcessMetrics> notEvictedWorkerProcessMetrics =
workerProcessMetrics.stream()
.filter(metric -> !evictedWorkers.containsAll(metric.getWorkerIds()))
.collect(Collectors.toList());

int notEvictedWorkerMemoryUsageKb =
notEvictedWorkerProcessMetrics.stream()
.mapToInt(WorkerProcessMetrics::getUsedMemoryInKb)
.sum();

if (notEvictedWorkerMemoryUsageKb <= options.totalWorkerMemoryLimitMb * 1000) {
return;
}

postponeInvalidation(notEvictedWorkerProcessMetrics, notEvictedWorkerMemoryUsageKb);
}
}

private void postponeInvalidation(
List<WorkerProcessMetrics> workerProcessMetrics, int notEvictedWorkerMemoryUsageKb) {
ImmutableSet<WorkerProcessMetrics> potentialCandidates =
getCandidates(
workerProcessMetrics, options.totalWorkerMemoryLimitMb, notEvictedWorkerMemoryUsageKb);

if (!potentialCandidates.isEmpty()) {
String msg =
String.format(
"Postponing eviction of worker ids: %s",
potentialCandidates.stream()
.flatMap(m -> m.getWorkerIds().stream())
.collect(toImmutableList()));
logger.atInfo().log("%s", msg);
if (options.workerVerbose && this.reporter != null) {
reporter.handle(Event.info(msg));
}
potentialCandidates.forEach(
m -> m.getStatus().maybeUpdateStatus(Status.PENDING_KILL_DUE_TO_MEMORY_PRESSURE));
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public void buildStarting(BuildStartingEvent event) {
}

if (workerPool == null) {
workerPool = new WorkerPoolImpl(workerFactory, newConfig);
workerPool = new WorkerPoolImpl(workerFactory, newConfig, options);
config = newConfig;
// If workerPool is restarted then we should recreate metrics.
WorkerProcessMetricsCollector.instance().clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@
/**
* Implementation of the WorkerPool.
*
* <p>TODO(b/323880131): Remove documentation once we completely remove the legacy implementation.
*
* <p>This implementation flattens this to have a single {@code WorkerKeyPool} for each worker key
* (we don't need the indirection in referencing both mnemonic and worker key since the mnemonic is
* part of the key). Additionally, it bakes in pool shrinking logic so that we can handle concurrent
Expand All @@ -58,14 +56,18 @@ public class WorkerPoolImpl implements WorkerPool {

private final ImmutableMap<String, Integer> singleplexMaxInstances;
private final ImmutableMap<String, Integer> multiplexMaxInstances;

private final WorkerOptions options;

private final ConcurrentHashMap<WorkerKey, WorkerKeyPool> pools = new ConcurrentHashMap<>();

public WorkerPoolImpl(WorkerFactory factory, WorkerPoolConfig config) {
public WorkerPoolImpl(WorkerFactory factory, WorkerPoolConfig config, WorkerOptions options) {
this.factory = factory;
this.singleplexMaxInstances =
getMaxInstances(config.getWorkerMaxInstances(), DEFAULT_MAX_SINGLEPLEX_WORKERS);
this.multiplexMaxInstances =
getMaxInstances(config.getWorkerMaxMultiplexInstances(), DEFAULT_MAX_MULTIPLEX_WORKERS);
this.options = options;
}

private static ImmutableMap<String, Integer> getMaxInstances(
Expand Down Expand Up @@ -240,10 +242,11 @@ private synchronized Set<Integer> evictWorkers(Set<Integer> workerIdsToEvict) {
worker.getWorkerKey().getMnemonic(),
worker.getWorkerId(),
worker.getWorkerKey().hashCode());
} else if (options.shrinkWorkerPool) {
String msg = String.format("Postponing eviction of worker id: %s", worker.getWorkerId());
logger.atInfo().log("%s", msg);
worker.getStatus().maybeUpdateStatus(Status.PENDING_KILL_DUE_TO_MEMORY_PRESSURE);
}
// TODO(b/323880131): Move postponing of invalidation from {@code WorkerLifecycleManager}
// here, since all we need to do is to update the statuses. We keep it like this for now
// to preserve the existing behavior.
}
return evictedWorkerIds;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ public final class WorkerLifecycleManagerTest {
private static final long PROCESS_ID_2 = 2L;
private static final long PROCESS_ID_3 = 3L;
private static final long PROCESS_ID_4 = 4L;
private static final long PROCESS_ID_5 = 5L;

private int workerIds = 1;

Expand Down Expand Up @@ -110,7 +109,9 @@ public void setUp() throws Exception {
public void testEvictWorkers_doNothing_lowMemoryUsage() throws Exception {
WorkerPoolImpl workerPool =
new WorkerPoolImpl(
factoryMock, new WorkerPoolConfig(entryList(DUMMY_MNEMONIC, 1), emptyEntryList()));
factoryMock,
new WorkerPoolConfig(entryList(DUMMY_MNEMONIC, 1), emptyEntryList()),
options);
WorkerKey key = createWorkerKey(DUMMY_MNEMONIC, fileSystem);
Worker w1 = workerPool.borrowWorker(key);
workerPool.returnWorker(key, w1);
Expand All @@ -136,7 +137,9 @@ public void testEvictWorkers_doNothing_lowMemoryUsage() throws Exception {
public void testEvictWorkers_doNothing_zeroThreshold() throws Exception {
WorkerPoolImpl workerPool =
new WorkerPoolImpl(
factoryMock, new WorkerPoolConfig(entryList(DUMMY_MNEMONIC, 1), emptyEntryList()));
factoryMock,
new WorkerPoolConfig(entryList(DUMMY_MNEMONIC, 1), emptyEntryList()),
options);
WorkerKey key = createWorkerKey(DUMMY_MNEMONIC, fileSystem);
Worker w1 = workerPool.borrowWorker(key);
workerPool.returnWorker(key, w1);
Expand All @@ -163,7 +166,9 @@ public void testEvictWorkers_doNothing_zeroThreshold() throws Exception {
public void testEvictWorkers_doNothing_emptyMetrics() throws Exception {
WorkerPoolImpl workerPool =
new WorkerPoolImpl(
factoryMock, new WorkerPoolConfig(entryList(DUMMY_MNEMONIC, 1), emptyEntryList()));
factoryMock,
new WorkerPoolConfig(entryList(DUMMY_MNEMONIC, 1), emptyEntryList()),
options);
WorkerKey key = createWorkerKey(DUMMY_MNEMONIC, fileSystem);
Worker w1 = workerPool.borrowWorker(key);
workerPool.returnWorker(key, w1);
Expand All @@ -189,7 +194,9 @@ public void testEvictWorkers_doNothing_emptyMetrics() throws Exception {
public void testGetEvictionCandidates_selectOnlyWorker() throws Exception {
WorkerPoolImpl workerPool =
new WorkerPoolImpl(
factoryMock, new WorkerPoolConfig(entryList(DUMMY_MNEMONIC, 1), emptyEntryList()));
factoryMock,
new WorkerPoolConfig(entryList(DUMMY_MNEMONIC, 1), emptyEntryList()),
options);
WorkerKey key = createWorkerKey(DUMMY_MNEMONIC, fileSystem);
Worker w1 = workerPool.borrowWorker(key);
workerPool.returnWorker(key, w1);
Expand All @@ -216,7 +223,9 @@ public void testGetEvictionCandidates_selectOnlyWorker() throws Exception {
public void testGetEvictionCandidates_evictLargestWorkers() throws Exception {
WorkerPoolImpl workerPool =
new WorkerPoolImpl(
factoryMock, new WorkerPoolConfig(entryList(DUMMY_MNEMONIC, 3), emptyEntryList()));
factoryMock,
new WorkerPoolConfig(entryList(DUMMY_MNEMONIC, 3), emptyEntryList()),
options);
WorkerKey key = createWorkerKey(DUMMY_MNEMONIC, fileSystem);
Worker w1 = workerPool.borrowWorker(key);
Worker w2 = workerPool.borrowWorker(key);
Expand Down Expand Up @@ -256,7 +265,9 @@ public void testGetEvictionCandidates_numberOfWorkersIsMoreThanDefaultNumTests()
throws Exception {
WorkerPoolImpl workerPool =
new WorkerPoolImpl(
factoryMock, new WorkerPoolConfig(entryList(DUMMY_MNEMONIC, 4), emptyEntryList()));
factoryMock,
new WorkerPoolConfig(entryList(DUMMY_MNEMONIC, 4), emptyEntryList()),
options);
WorkerKey key = createWorkerKey(DUMMY_MNEMONIC, fileSystem);
Worker w1 = workerPool.borrowWorker(key);
Worker w2 = workerPool.borrowWorker(key);
Expand Down Expand Up @@ -296,7 +307,9 @@ public void testGetEvictionCandidates_evictWorkerWithSameMenmonicButDifferentKey
throws Exception {
WorkerPoolImpl workerPool =
new WorkerPoolImpl(
factoryMock, new WorkerPoolConfig(entryList(DUMMY_MNEMONIC, 3), emptyEntryList()));
factoryMock,
new WorkerPoolConfig(entryList(DUMMY_MNEMONIC, 3), emptyEntryList()),
options);
WorkerKey key1 = createWorkerKey(DUMMY_MNEMONIC, fileSystem);
WorkerKey key2 = createWorkerKey(DUMMY_MNEMONIC, fileSystem, true);

Expand Down Expand Up @@ -339,7 +352,9 @@ public void testGetEvictionCandidates_evictWorkerWithSameMenmonicButDifferentKey
public void testGetEvictionCandidates_evictOnlyIdleWorkers() throws Exception {
WorkerPoolImpl workerPool =
new WorkerPoolImpl(
factoryMock, new WorkerPoolConfig(entryList(DUMMY_MNEMONIC, 3), emptyEntryList()));
factoryMock,
new WorkerPoolConfig(entryList(DUMMY_MNEMONIC, 3), emptyEntryList()),
options);
WorkerKey key = createWorkerKey(DUMMY_MNEMONIC, fileSystem);
Worker w1 = workerPool.borrowWorker(key);
Worker w2 = workerPool.borrowWorker(key);
Expand Down Expand Up @@ -379,7 +394,8 @@ public void testGetEvictionCandidates_evictDifferentWorkerKeys() throws Exceptio
WorkerPoolImpl workerPool =
new WorkerPoolImpl(
factoryMock,
new WorkerPoolConfig(entryList(DUMMY_MNEMONIC, 2, "smart", 2), emptyEntryList()));
new WorkerPoolConfig(entryList(DUMMY_MNEMONIC, 2, "smart", 2), emptyEntryList()),
options);
WorkerKey key1 = createWorkerKey(DUMMY_MNEMONIC, fileSystem);
WorkerKey key2 = createWorkerKey("smart", fileSystem);
Worker w1 = workerPool.borrowWorker(key1);
Expand Down Expand Up @@ -425,106 +441,13 @@ public void testGetEvictionCandidates_evictDifferentWorkerKeys() throws Exceptio
assertThat(w4.getStatus().isValid()).isTrue();
}

@Test
public void testGetEvictionCandidates_testDoomedWorkers() throws Exception {
WorkerPoolImpl workerPool =
new WorkerPoolImpl(
factoryMock, new WorkerPoolConfig(entryList(DUMMY_MNEMONIC, 2), emptyEntryList()));
WorkerKey key = createWorkerKey(DUMMY_MNEMONIC, fileSystem);
Worker w1 = workerPool.borrowWorker(key);
Worker w2 = workerPool.borrowWorker(key);

ImmutableList<WorkerProcessMetrics> workerMetrics =
ImmutableList.of(
createWorkerMetric(w1, PROCESS_ID_1, /* memoryInKb= */ 2000),
createWorkerMetric(w2, PROCESS_ID_2, /* memoryInKb= */ 2000));

WorkerOptions options = new WorkerOptions();
options.totalWorkerMemoryLimitMb = 1;
options.shrinkWorkerPool = true;

WorkerLifecycleManager manager = new WorkerLifecycleManager(workerPool, options);

assertThat(workerPool.getIdleWorkers()).isEmpty();
assertThat(workerPool.getNumActive(key)).isEqualTo(2);
assertThat(w1.getStatus().isValid()).isTrue();
assertThat(w2.getStatus().isValid()).isTrue();

manager.evictWorkers(workerMetrics);

assertThat(w1.getStatus().get()).isEqualTo(Status.PENDING_KILL_DUE_TO_MEMORY_PRESSURE);
assertThat(w2.getStatus().get()).isEqualTo(Status.PENDING_KILL_DUE_TO_MEMORY_PRESSURE);

// Return only one worker.
workerPool.returnWorker(key, w1);

// w1 gets destroyed when it is returned, so there are 0 idle workers.
assertThat(workerPool.getIdleWorkers()).isEmpty();
assertThat(workerPool.getNumActive(key)).isEqualTo(1);
// Since w1 is already returned, it is killed on return.
assertThat(w1.getStatus().get()).isEqualTo(Status.KILLED_DUE_TO_MEMORY_PRESSURE);
// Since w2 is still active, it is marked to be killed which will happen when it is returned.
assertThat(w2.getStatus().get()).isEqualTo(Status.PENDING_KILL_DUE_TO_MEMORY_PRESSURE);

// Return the remaining worker.
workerPool.returnWorker(key, w2);
assertThat(w2.getStatus().get()).isEqualTo(Status.KILLED_DUE_TO_MEMORY_PRESSURE);
}

@Test
public void testGetEvictionCandidates_testDoomedAndIdleWorkers() throws Exception {
WorkerPoolImpl workerPool =
new WorkerPoolImpl(
factoryMock, new WorkerPoolConfig(entryList(DUMMY_MNEMONIC, 5), emptyEntryList()));
WorkerKey key = createWorkerKey(DUMMY_MNEMONIC, fileSystem);
Worker w1 = workerPool.borrowWorker(key);
Worker w2 = workerPool.borrowWorker(key);
Worker w3 = workerPool.borrowWorker(key);
Worker w4 = workerPool.borrowWorker(key);
Worker w5 = workerPool.borrowWorker(key);
workerPool.returnWorker(key, w1);
workerPool.returnWorker(key, w2);

ImmutableList<WorkerProcessMetrics> workerMetrics =
ImmutableList.of(
createWorkerMetric(w1, PROCESS_ID_1, /* memoryInKb= */ 2000),
createWorkerMetric(w2, PROCESS_ID_2, /* memoryInKb= */ 1000),
createWorkerMetric(w3, PROCESS_ID_3, /* memoryInKb= */ 4000),
createWorkerMetric(w4, PROCESS_ID_4, /* memoryInKb= */ 5000),
createWorkerMetric(w5, PROCESS_ID_5, /* memoryInKb= */ 1000));

WorkerOptions options = new WorkerOptions();
options.totalWorkerMemoryLimitMb = 2;
options.shrinkWorkerPool = true;

WorkerLifecycleManager manager = new WorkerLifecycleManager(workerPool, options);

assertThat(workerPool.getIdleWorkers()).hasSize(2);
assertThat(workerPool.getNumActive(key)).isEqualTo(3);
assertThat(w1.getStatus().isValid()).isTrue();
assertThat(w2.getStatus().isValid()).isTrue();
assertThat(w3.getStatus().isValid()).isTrue();
assertThat(w4.getStatus().isValid()).isTrue();
assertThat(w5.getStatus().isValid()).isTrue();

manager.evictWorkers(workerMetrics);

assertThat(workerPool.getIdleWorkers()).isEmpty();
assertThat(workerPool.getNumActive(key)).isEqualTo(3);
// w1 and w2 are killed immediately.
assertThat(w1.getStatus().get()).isEqualTo(Status.KILLED_DUE_TO_MEMORY_PRESSURE);
assertThat(w2.getStatus().get()).isEqualTo(Status.KILLED_DUE_TO_MEMORY_PRESSURE);
// w3 and w4 are killed only when returned.
assertThat(w3.getStatus().get()).isEqualTo(Status.PENDING_KILL_DUE_TO_MEMORY_PRESSURE);
assertThat(w4.getStatus().get()).isEqualTo(Status.PENDING_KILL_DUE_TO_MEMORY_PRESSURE);
assertThat(w5.getStatus().isValid()).isTrue();
}

@Test
public void evictWorkers_testMultiplexWorkers() throws Exception {
WorkerPoolImpl workerPool =
new WorkerPoolImpl(
factoryMock, new WorkerPoolConfig(emptyEntryList(), entryList(DUMMY_MNEMONIC, 2)));
factoryMock,
new WorkerPoolConfig(emptyEntryList(), entryList(DUMMY_MNEMONIC, 2)),
options);
WorkerKey key =
createWorkerKey(DUMMY_MNEMONIC, fileSystem, /* multiplex= */ true, /* sandboxed= */ false);
Worker w1 = workerPool.borrowWorker(key);
Expand Down Expand Up @@ -555,7 +478,9 @@ public void evictWorkers_testMultiplexWorkers() throws Exception {
public void evictWorkers_doomMultiplexWorker() throws Exception {
WorkerPoolImpl workerPool =
new WorkerPoolImpl(
factoryMock, new WorkerPoolConfig(emptyEntryList(), entryList(DUMMY_MNEMONIC, 2)));
factoryMock,
new WorkerPoolConfig(emptyEntryList(), entryList(DUMMY_MNEMONIC, 2)),
options);
WorkerKey key =
createWorkerKey(DUMMY_MNEMONIC, fileSystem, /* multiplex= */ true, /* sandboxed= */ false);
Worker w1 = workerPool.borrowWorker(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@

/** Tests WorkerPool. */
@RunWith(JUnit4.class)
public class WorkerPoolTest {
public class WorkerPoolImplTest {

public static final FileSystem fileSystem =
new InMemoryFileSystem(BlazeClock.instance(), DigestHashFunction.SHA256);
Expand Down Expand Up @@ -68,7 +68,8 @@ public void setUp() throws Exception {
new WorkerPoolConfig(
/* workerMaxInstances= */ ImmutableList.of(Maps.immutableEntry("mnem", 2)),
/* workerMaxMultiplexInstances= */ ImmutableList.of(
Maps.immutableEntry("mnem", 2))));
Maps.immutableEntry("mnem", 2))),
options);
doAnswer(
arg ->
new TestWorker(
Expand Down Expand Up @@ -294,6 +295,22 @@ public void testEvict_doesNotEvictActiveWorkers() throws Exception {
assertThat(workerPool.getNumActive(workerKey)).isEqualTo(1);
}

@Test
public void testEvict_postponeInvalidationOfIdleWorkers() throws Exception {
// The postpone-invalidation should only work when shrinking is enabled.
options.shrinkWorkerPool = true;
WorkerKey workerKey = createWorkerKey(fileSystem, "mnem", false);
Worker worker1 = workerPool.borrowWorker(workerKey);
Worker worker2 = workerPool.borrowWorker(workerKey);
workerPool.returnWorker(workerKey, worker1);
workerPool.returnWorker(workerKey, worker2);
ImmutableSet<Integer> evicted = workerPool.evictWorkers(ImmutableSet.of(worker1.getWorkerId()));
assertThat(evicted).containsExactly(worker1.getWorkerId());
// Worker2 does not get evicted because it's not passed in the eviction list, but it should be
// marked with `Status.PENDING_KILL_DUE_TO_MEMORY_PRESSURE`.
assertThat(worker2.getStatus().get()).isEqualTo(Status.PENDING_KILL_DUE_TO_MEMORY_PRESSURE);
}

@Test
public void testGetIdleWorkers() throws Exception {
WorkerKey workerKey = createWorkerKey(fileSystem, "mnem", false);
Expand Down

0 comments on commit 01cb3cd

Please sign in to comment.