Skip to content

Commit

Permalink
concord-server: replace synchronized with locks (#1060)
Browse files Browse the repository at this point in the history
  • Loading branch information
ibodrov authored Jan 5, 2025
1 parent 334bd77 commit 2dfca4f
Show file tree
Hide file tree
Showing 10 changed files with 181 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import javax.inject.Inject;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public final class ConcordServer {

Expand All @@ -45,7 +47,7 @@ public final class ConcordServer {
@Inject
private HttpServer server;

private final Object controlMutex = new Object();
private final Lock controlMutex = new ReentrantLock();

public static ConcordServer withModules(Module... modules) throws Exception {
return withModules(List.of(modules));
Expand All @@ -65,15 +67,19 @@ public static ConcordServer withModules(Collection<Module> modules) throws Excep
}

public ConcordServer start() throws Exception {
synchronized (controlMutex) {
controlMutex.lock();
try {
tasks.start();
server.start();
} finally {
controlMutex.unlock();
}
return this;
}

public void stop() throws Exception {
synchronized (controlMutex) {
controlMutex.lock();
try {
if (server != null) {
server.stop();
server = null;
Expand All @@ -83,6 +89,8 @@ public void stop() throws Exception {
tasks.stop();
tasks = null;
}
} finally {
controlMutex.unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,34 @@

import javax.inject.Inject;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class BackgroundTasks {

private final Set<BackgroundTask> tasks;
private final Lock controlMutex = new ReentrantLock();

@Inject
public BackgroundTasks(Set<BackgroundTask> tasks) {
this.tasks = tasks;
this.tasks = Set.copyOf(tasks);
}

public void start() {
synchronized (tasks) {
controlMutex.lock();
try {
tasks.forEach(BackgroundTask::start);
} finally {
controlMutex.unlock();
}
}

public void stop() {
synchronized (tasks) {
controlMutex.lock();
try {
tasks.forEach(BackgroundTask::stop);
} finally {
controlMutex.unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@

import javax.inject.Inject;
import java.util.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class WorkerMetrics implements BackgroundTask {

Expand All @@ -45,21 +47,28 @@ public WorkerMetrics(WorkerMetricsConfiguration cfg, AgentManager agentManager)
String prop = cfg.getGroupByCapabilitiesProperty();
String[] path = prop.split("\\.");

// retain the metric's keys
// if a certain flavor of workers disappears we'd want to show zero in the metric instead of no metric at all
Set<Object> keys = Collections.synchronizedSet(new HashSet<>());
// "persist" some of the metrics
// if some "flavor" of agents disappear, we would want to show zero in the metric instead of no metric at all
// we keep keys of all agent metrics in memory
Set<Object> keys = new HashSet<>();
Lock mutex = new ReentrantLock();

collector = new Collector() {
@Override
public List<MetricFamilySamples> collect() {
Collection<AgentWorkerEntry> data = agentManager.getAvailableAgents();

Map<Object, Long> currentData = AgentWorkerUtils.groupBy(data, path);
keys.addAll(currentData.keySet());

Map<Object, Long> m = new HashMap<>();
keys.forEach(k -> m.put(k, 0L));
m.putAll(currentData);

Map<Object, Long> currentData = AgentWorkerUtils.groupBy(data, path);
mutex.lock();
try {
keys.addAll(currentData.keySet());
keys.forEach(k -> m.put(k, 0L));
m.putAll(currentData);
} finally {
mutex.unlock();
}

GaugeMetricFamily f = new GaugeMetricFamily("available_workers", "number of available workers for the " + prop, Collections.singletonList("prop"));
m.forEach((k, v) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;

public final class KeyPairUtils {
Expand All @@ -37,15 +39,18 @@ public final class KeyPairUtils {
private static final String DEFAULT_KEY_COMMENT = "concord-server";

private static final JSch jsch = new JSch();
private static final Lock mutex = new ReentrantLock();

public static KeyPair create(int keySize) {
com.jcraft.jsch.KeyPair k;
synchronized (jsch) {
try {
k = com.jcraft.jsch.KeyPair.genKeyPair(jsch, DEFAULT_KEY_TYPE, keySize);
} catch (JSchException e) {
throw new SecurityException(e);
}

mutex.lock();
try {
k = com.jcraft.jsch.KeyPair.genKeyPair(jsch, DEFAULT_KEY_TYPE, keySize);
} catch (JSchException e) {
throw new SecurityException(e);
} finally {
mutex.unlock();
}

byte[] publicKey = array(out -> k.writePublicKey(out, DEFAULT_KEY_COMMENT));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;

Expand All @@ -53,7 +54,7 @@ public class PolicyCache implements BackgroundTask {

private final ObjectMapper objectMapper;
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Object refreshMutex = new Object();
private final Lock refreshMutex = new ReentrantLock();

private final PolicyCacheConfiguration cacheCfg;
private final Dao dao;
Expand Down Expand Up @@ -93,9 +94,12 @@ public void refresh() {
try {
reloadPolicies();
} catch (Exception e) {
synchronized (refreshMutex) {
refreshMutex.lock();
try {
lastRefreshRequestAt = System.currentTimeMillis();
refreshMutex.notifyAll();
} finally {
refreshMutex.unlock();
}
}
}
Expand Down Expand Up @@ -167,12 +171,15 @@ private void run() {
long now = System.currentTimeMillis();
reloadPolicies();

synchronized (refreshMutex) {
refreshMutex.lock();
try {
if (lastRefreshRequestAt > now) {
lastRefreshRequestAt = now;
} else {
refreshMutex.wait(cacheCfg.getReloadInterval().toMillis());
}
} finally {
refreshMutex.unlock();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiFunction;

/**
Expand All @@ -31,13 +33,15 @@ public final class KeyIndex<K extends Key<?>> {

private final Map<String, K> keys = new HashMap<>();
private final BiFunction<String, Class<?>, K> keyMaker;
private final Lock mutex = new ReentrantLock();

public KeyIndex(BiFunction<String, Class<?>, K> keyMaker) {
this.keyMaker = keyMaker;
}

public K register(String name, Class<?> type) {
synchronized (keys) {
mutex.lock();
try {
if (keys.containsKey(name)) {
throw new IllegalStateException("Key '" + name + "' is already registered. " +
"Check for duplicate declarations in the code");
Expand All @@ -46,6 +50,8 @@ public K register(String name, Class<?> type) {
K k = keyMaker.apply(name, type);
keys.put(name, k);
return k;
} finally {
mutex.unlock();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;

import static com.walmartlabs.concord.server.jooq.Tables.REPOSITORIES;
Expand All @@ -70,6 +72,8 @@ public class EnqueuedBatchTask extends PeriodicTask {
private final List<String> inflightRepoUrls;
private final AtomicInteger freeWorkersCount;

private final Lock mutex = new ReentrantLock();

@Inject
public EnqueuedBatchTask(Dao dao,
EnqueueWorkersConfiguration cfg,
Expand All @@ -85,21 +89,28 @@ public EnqueuedBatchTask(Dao dao,

this.queue = new ArrayBlockingQueue<>(cfg.getWorkersCount());
this.freeWorkersCount = new AtomicInteger(cfg.getWorkersCount());
this.inflightRepoUrls = Collections.synchronizedList(new ArrayList<>(cfg.getWorkersCount()));
this.inflightRepoUrls = new ArrayList<>(cfg.getWorkersCount());

this.executor = Executors.newFixedThreadPool(cfg.getWorkersCount());
for (int i = 0; i < cfg.getWorkersCount(); i++) {
this.executor.submit(new Worker(pipeline, repositoryManager, queue));
}

metricRegistry.gauge("enqueued-workers-available", () -> freeWorkersCount::get);
metricRegistry.gauge("enqueued-inflight-urls", () -> inflightRepoUrls::size);
metricRegistry.gauge("enqueued-inflight-urls", () -> () -> {
mutex.lock();
try {
return inflightRepoUrls.size();
} finally {
mutex.unlock();
}
});
}

@Override
public void stop() {
super.stop();

executor.shutdownNow();

try {
Expand All @@ -117,12 +128,7 @@ protected boolean performTask() {
}

int limit = Math.min(freeWorkersCount.get(), cfg.getWorkersCount());

List<String> ignoreRepoUrls;
synchronized (inflightRepoUrls) {
ignoreRepoUrls = new ArrayList<>(inflightRepoUrls);
}

List<String> ignoreRepoUrls = copyInFlightRepos();
Collection<Batch> batches = dao.poll(ignoreRepoUrls, limit);
if (batches.isEmpty()) {
return false;
Expand All @@ -141,10 +147,15 @@ protected boolean performTask() {
freeWorkersCount.decrementAndGet();
}

for (Batch b : batches) {
if (b.repoUrl() != null) {
inflightRepoUrls.add(b.repoUrl());
mutex.lock();
try {
for (Batch b : batches) {
if (b.repoUrl() != null) {
inflightRepoUrls.add(b.repoUrl());
}
}
} finally {
mutex.unlock();
}

queue.addAll(batches);
Expand All @@ -155,7 +166,21 @@ protected boolean performTask() {
private void onWorkerFree(String repoUrl) {
freeWorkersCount.incrementAndGet();
if (repoUrl != null) {
inflightRepoUrls.remove(repoUrl);
mutex.lock();
try {
inflightRepoUrls.remove(repoUrl);
} finally {
mutex.unlock();
}
}
}

private List<String> copyInFlightRepos() {
mutex.lock();
try {
return List.copyOf(inflightRepoUrls);
} finally {
mutex.unlock();
}
}

Expand Down Expand Up @@ -405,7 +430,7 @@ private static Collection<Batch> removeDuplicateUrls(Collection<Batch> batches)
for (Batch b : batches) {
if (b.repoUrl() == null) {
result.add(b);
} else if (!repoUrls.contains(b.repoUrl())){
} else if (!repoUrls.contains(b.repoUrl())) {
result.add(b);
repoUrls.add(b.repoUrl());
}
Expand Down
Loading

0 comments on commit 2dfca4f

Please sign in to comment.