diff --git a/server/impl/src/main/java/com/walmartlabs/concord/server/ConcordServer.java b/server/impl/src/main/java/com/walmartlabs/concord/server/ConcordServer.java index bbf0f63547..d0da08892e 100644 --- a/server/impl/src/main/java/com/walmartlabs/concord/server/ConcordServer.java +++ b/server/impl/src/main/java/com/walmartlabs/concord/server/ConcordServer.java @@ -31,6 +31,8 @@ import javax.inject.Inject; import java.util.Collection; import java.util.List; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; public final class ConcordServer { @@ -45,7 +47,7 @@ public final class ConcordServer { @Inject private HttpServer server; - private final Object controlMutex = new Object(); + private final Lock controlMutex = new ReentrantLock(); public static ConcordServer withModules(Module... modules) throws Exception { return withModules(List.of(modules)); @@ -65,15 +67,19 @@ public static ConcordServer withModules(Collection modules) throws Excep } public ConcordServer start() throws Exception { - synchronized (controlMutex) { + controlMutex.lock(); + try { tasks.start(); server.start(); + } finally { + controlMutex.unlock(); } return this; } public void stop() throws Exception { - synchronized (controlMutex) { + controlMutex.lock(); + try { if (server != null) { server.stop(); server = null; @@ -83,6 +89,8 @@ public void stop() throws Exception { tasks.stop(); tasks = null; } + } finally { + controlMutex.unlock(); } } diff --git a/server/impl/src/main/java/com/walmartlabs/concord/server/boot/BackgroundTasks.java b/server/impl/src/main/java/com/walmartlabs/concord/server/boot/BackgroundTasks.java index 875a2bb483..419f952c9a 100644 --- a/server/impl/src/main/java/com/walmartlabs/concord/server/boot/BackgroundTasks.java +++ b/server/impl/src/main/java/com/walmartlabs/concord/server/boot/BackgroundTasks.java @@ -24,25 +24,34 @@ import javax.inject.Inject; import java.util.Set; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; public class BackgroundTasks { private final Set tasks; + private final Lock controlMutex = new ReentrantLock(); @Inject public BackgroundTasks(Set tasks) { - this.tasks = tasks; + this.tasks = Set.copyOf(tasks); } public void start() { - synchronized (tasks) { + controlMutex.lock(); + try { tasks.forEach(BackgroundTask::start); + } finally { + controlMutex.unlock(); } } public void stop() { - synchronized (tasks) { + controlMutex.lock(); + try { tasks.forEach(BackgroundTask::stop); + } finally { + controlMutex.unlock(); } } diff --git a/server/impl/src/main/java/com/walmartlabs/concord/server/metrics/WorkerMetrics.java b/server/impl/src/main/java/com/walmartlabs/concord/server/metrics/WorkerMetrics.java index aa06ec2d79..f4f225d7aa 100644 --- a/server/impl/src/main/java/com/walmartlabs/concord/server/metrics/WorkerMetrics.java +++ b/server/impl/src/main/java/com/walmartlabs/concord/server/metrics/WorkerMetrics.java @@ -33,6 +33,8 @@ import javax.inject.Inject; import java.util.*; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; public class WorkerMetrics implements BackgroundTask { @@ -45,21 +47,28 @@ public WorkerMetrics(WorkerMetricsConfiguration cfg, AgentManager agentManager) String prop = cfg.getGroupByCapabilitiesProperty(); String[] path = prop.split("\\."); - // retain the metric's keys - // if a certain flavor of workers disappears we'd want to show zero in the metric instead of no metric at all - Set keys = Collections.synchronizedSet(new HashSet<>()); + // "persist" some of the metrics + // if some "flavor" of agents disappear, we would want to show zero in the metric instead of no metric at all + // we keep keys of all agent metrics in memory + Set keys = new HashSet<>(); + Lock mutex = new ReentrantLock(); collector = new Collector() { @Override public List collect() { Collection data = agentManager.getAvailableAgents(); - Map currentData = AgentWorkerUtils.groupBy(data, path); - keys.addAll(currentData.keySet()); - Map m = new HashMap<>(); - keys.forEach(k -> m.put(k, 0L)); - m.putAll(currentData); + + Map currentData = AgentWorkerUtils.groupBy(data, path); + mutex.lock(); + try { + keys.addAll(currentData.keySet()); + keys.forEach(k -> m.put(k, 0L)); + m.putAll(currentData); + } finally { + mutex.unlock(); + } GaugeMetricFamily f = new GaugeMetricFamily("available_workers", "number of available workers for the " + prop, Collections.singletonList("prop")); m.forEach((k, v) -> { diff --git a/server/impl/src/main/java/com/walmartlabs/concord/server/org/secret/KeyPairUtils.java b/server/impl/src/main/java/com/walmartlabs/concord/server/org/secret/KeyPairUtils.java index 3af96255aa..368eb3e446 100644 --- a/server/impl/src/main/java/com/walmartlabs/concord/server/org/secret/KeyPairUtils.java +++ b/server/impl/src/main/java/com/walmartlabs/concord/server/org/secret/KeyPairUtils.java @@ -29,6 +29,8 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; public final class KeyPairUtils { @@ -37,15 +39,18 @@ public final class KeyPairUtils { private static final String DEFAULT_KEY_COMMENT = "concord-server"; private static final JSch jsch = new JSch(); + private static final Lock mutex = new ReentrantLock(); public static KeyPair create(int keySize) { com.jcraft.jsch.KeyPair k; - synchronized (jsch) { - try { - k = com.jcraft.jsch.KeyPair.genKeyPair(jsch, DEFAULT_KEY_TYPE, keySize); - } catch (JSchException e) { - throw new SecurityException(e); - } + + mutex.lock(); + try { + k = com.jcraft.jsch.KeyPair.genKeyPair(jsch, DEFAULT_KEY_TYPE, keySize); + } catch (JSchException e) { + throw new SecurityException(e); + } finally { + mutex.unlock(); } byte[] publicKey = array(out -> k.writePublicKey(out, DEFAULT_KEY_COMMENT)); diff --git a/server/impl/src/main/java/com/walmartlabs/concord/server/policy/PolicyCache.java b/server/impl/src/main/java/com/walmartlabs/concord/server/policy/PolicyCache.java index bbadf8f344..e71c9257d7 100644 --- a/server/impl/src/main/java/com/walmartlabs/concord/server/policy/PolicyCache.java +++ b/server/impl/src/main/java/com/walmartlabs/concord/server/policy/PolicyCache.java @@ -40,6 +40,7 @@ import java.util.*; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; @@ -53,7 +54,7 @@ public class PolicyCache implements BackgroundTask { private final ObjectMapper objectMapper; private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); - private final Object refreshMutex = new Object(); + private final Lock refreshMutex = new ReentrantLock(); private final PolicyCacheConfiguration cacheCfg; private final Dao dao; @@ -93,9 +94,12 @@ public void refresh() { try { reloadPolicies(); } catch (Exception e) { - synchronized (refreshMutex) { + refreshMutex.lock(); + try { lastRefreshRequestAt = System.currentTimeMillis(); refreshMutex.notifyAll(); + } finally { + refreshMutex.unlock(); } } } @@ -167,12 +171,15 @@ private void run() { long now = System.currentTimeMillis(); reloadPolicies(); - synchronized (refreshMutex) { + refreshMutex.lock(); + try { if (lastRefreshRequestAt > now) { lastRefreshRequestAt = now; } else { refreshMutex.wait(cacheCfg.getReloadInterval().toMillis()); } + } finally { + refreshMutex.unlock(); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); diff --git a/server/impl/src/main/java/com/walmartlabs/concord/server/process/keys/KeyIndex.java b/server/impl/src/main/java/com/walmartlabs/concord/server/process/keys/KeyIndex.java index a12f620a35..f6b3d41cee 100644 --- a/server/impl/src/main/java/com/walmartlabs/concord/server/process/keys/KeyIndex.java +++ b/server/impl/src/main/java/com/walmartlabs/concord/server/process/keys/KeyIndex.java @@ -22,6 +22,8 @@ import java.util.HashMap; import java.util.Map; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiFunction; /** @@ -31,13 +33,15 @@ public final class KeyIndex> { private final Map keys = new HashMap<>(); private final BiFunction, K> keyMaker; + private final Lock mutex = new ReentrantLock(); public KeyIndex(BiFunction, K> keyMaker) { this.keyMaker = keyMaker; } public K register(String name, Class type) { - synchronized (keys) { + mutex.lock(); + try { if (keys.containsKey(name)) { throw new IllegalStateException("Key '" + name + "' is already registered. " + "Check for duplicate declarations in the code"); @@ -46,6 +50,8 @@ public K register(String name, Class type) { K k = keyMaker.apply(name, type); keys.put(name, k); return k; + } finally { + mutex.unlock(); } } } diff --git a/server/impl/src/main/java/com/walmartlabs/concord/server/process/queue/EnqueuedBatchTask.java b/server/impl/src/main/java/com/walmartlabs/concord/server/process/queue/EnqueuedBatchTask.java index de14e2898f..d015b0bdea 100644 --- a/server/impl/src/main/java/com/walmartlabs/concord/server/process/queue/EnqueuedBatchTask.java +++ b/server/impl/src/main/java/com/walmartlabs/concord/server/process/queue/EnqueuedBatchTask.java @@ -48,6 +48,8 @@ import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import static com.walmartlabs.concord.server.jooq.Tables.REPOSITORIES; @@ -70,6 +72,8 @@ public class EnqueuedBatchTask extends PeriodicTask { private final List inflightRepoUrls; private final AtomicInteger freeWorkersCount; + private final Lock mutex = new ReentrantLock(); + @Inject public EnqueuedBatchTask(Dao dao, EnqueueWorkersConfiguration cfg, @@ -85,7 +89,7 @@ public EnqueuedBatchTask(Dao dao, this.queue = new ArrayBlockingQueue<>(cfg.getWorkersCount()); this.freeWorkersCount = new AtomicInteger(cfg.getWorkersCount()); - this.inflightRepoUrls = Collections.synchronizedList(new ArrayList<>(cfg.getWorkersCount())); + this.inflightRepoUrls = new ArrayList<>(cfg.getWorkersCount()); this.executor = Executors.newFixedThreadPool(cfg.getWorkersCount()); for (int i = 0; i < cfg.getWorkersCount(); i++) { @@ -93,13 +97,20 @@ public EnqueuedBatchTask(Dao dao, } metricRegistry.gauge("enqueued-workers-available", () -> freeWorkersCount::get); - metricRegistry.gauge("enqueued-inflight-urls", () -> inflightRepoUrls::size); + metricRegistry.gauge("enqueued-inflight-urls", () -> () -> { + mutex.lock(); + try { + return inflightRepoUrls.size(); + } finally { + mutex.unlock(); + } + }); } @Override public void stop() { super.stop(); - + executor.shutdownNow(); try { @@ -117,12 +128,7 @@ protected boolean performTask() { } int limit = Math.min(freeWorkersCount.get(), cfg.getWorkersCount()); - - List ignoreRepoUrls; - synchronized (inflightRepoUrls) { - ignoreRepoUrls = new ArrayList<>(inflightRepoUrls); - } - + List ignoreRepoUrls = copyInFlightRepos(); Collection batches = dao.poll(ignoreRepoUrls, limit); if (batches.isEmpty()) { return false; @@ -141,10 +147,15 @@ protected boolean performTask() { freeWorkersCount.decrementAndGet(); } - for (Batch b : batches) { - if (b.repoUrl() != null) { - inflightRepoUrls.add(b.repoUrl()); + mutex.lock(); + try { + for (Batch b : batches) { + if (b.repoUrl() != null) { + inflightRepoUrls.add(b.repoUrl()); + } } + } finally { + mutex.unlock(); } queue.addAll(batches); @@ -155,7 +166,21 @@ protected boolean performTask() { private void onWorkerFree(String repoUrl) { freeWorkersCount.incrementAndGet(); if (repoUrl != null) { - inflightRepoUrls.remove(repoUrl); + mutex.lock(); + try { + inflightRepoUrls.remove(repoUrl); + } finally { + mutex.unlock(); + } + } + } + + private List copyInFlightRepos() { + mutex.lock(); + try { + return List.copyOf(inflightRepoUrls); + } finally { + mutex.unlock(); } } @@ -405,7 +430,7 @@ private static Collection removeDuplicateUrls(Collection batches) for (Batch b : batches) { if (b.repoUrl() == null) { result.add(b); - } else if (!repoUrls.contains(b.repoUrl())){ + } else if (!repoUrls.contains(b.repoUrl())) { result.add(b); repoUrls.add(b.repoUrl()); } diff --git a/server/impl/src/main/java/com/walmartlabs/concord/server/security/ldap/ConcordDnsSrvLdapContextFactory.java b/server/impl/src/main/java/com/walmartlabs/concord/server/security/ldap/ConcordDnsSrvLdapContextFactory.java index 4a8ff1bc18..80a3bcb3e7 100644 --- a/server/impl/src/main/java/com/walmartlabs/concord/server/security/ldap/ConcordDnsSrvLdapContextFactory.java +++ b/server/impl/src/main/java/com/walmartlabs/concord/server/security/ldap/ConcordDnsSrvLdapContextFactory.java @@ -40,6 +40,8 @@ import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; public class ConcordDnsSrvLdapContextFactory implements LdapContextFactory { @@ -51,9 +53,10 @@ public class ConcordDnsSrvLdapContextFactory implements LdapContextFactory { private static final String PORT = "3269"; private static final int MAX_RETRY = 100; - private Iterator ldapUrlIterator; - private final LdapConfiguration cfg; + private final Lock mutex = new ReentrantLock(); + + private Iterator ldapUrlIterator; /* 1. check dnsSRV and refresh SRV list @@ -70,12 +73,22 @@ public String getCurrentLdapUrl() { return ((JndiLdapContextFactory) this.delegate).getUrl(); } - public synchronized void setLdapContextFactory(LdapContextFactory ldapContextFactory) { - this.delegate = ldapContextFactory; + public void setLdapContextFactory(LdapContextFactory ldapContextFactory) { + mutex.lock(); + try { + this.delegate = ldapContextFactory; + } finally { + mutex.unlock(); + } } - public synchronized void setLdapUrlIterator(Iterator ldapUrlIterator) { - this.ldapUrlIterator = ldapUrlIterator; + public void setLdapUrlIterator(Iterator ldapUrlIterator) { + mutex.lock(); + try { + this.ldapUrlIterator = ldapUrlIterator; + } finally { + mutex.unlock(); + } } @Override @@ -180,7 +193,7 @@ private static String removeLastCharIfDot(String s) { private String resolveUrl() { if (cfg.getDnsSRVName() != null) { - String ldapUrl = getNextLdapUrl(this.ldapUrlIterator); + String ldapUrl = getNextLdapUrl(); if (ldapUrl != null) { return ldapUrl; } @@ -191,11 +204,16 @@ private String resolveUrl() { return null; } - private static String getNextLdapUrl(Iterator ldapUrlIterator) { - if (ldapUrlIterator != null && ldapUrlIterator.hasNext()) { - return ldapUrlIterator.next(); + private String getNextLdapUrl() { + mutex.lock(); + try { + if (ldapUrlIterator != null && ldapUrlIterator.hasNext()) { + return ldapUrlIterator.next(); + } + return null; + } finally { + mutex.unlock(); } - return null; } private void handleCommunicationException(Exception e) throws NamingException { @@ -207,6 +225,7 @@ private void handleCommunicationException(Exception e) throws NamingException { if (this.ldapUrlIterator != null && !this.ldapUrlIterator.hasNext()) { this.refreshSRVList(); } - this.setLdapContextFactory(getNewContextInstance(getNextLdapUrl(this.ldapUrlIterator))); + + this.setLdapContextFactory(getNewContextInstance(getNextLdapUrl())); } } diff --git a/server/impl/src/main/java/com/walmartlabs/concord/server/security/ldap/TrustingSslSocketFactory.java b/server/impl/src/main/java/com/walmartlabs/concord/server/security/ldap/TrustingSslSocketFactory.java index df3ce8b522..056a08a9db 100644 --- a/server/impl/src/main/java/com/walmartlabs/concord/server/security/ldap/TrustingSslSocketFactory.java +++ b/server/impl/src/main/java/com/walmartlabs/concord/server/security/ldap/TrustingSslSocketFactory.java @@ -31,6 +31,8 @@ import java.security.SecureRandom; import java.security.cert.CertificateException; import java.security.cert.X509Certificate; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; /** * A socket factory that creates SSL sockets using {@link DummyTrustManager}. @@ -39,10 +41,17 @@ */ public class TrustingSslSocketFactory extends SocketFactory { + private static final Lock mutex = new ReentrantLock(); + private final SSLSocketFactory delegate; - public static synchronized SocketFactory getDefault() { - return new TrustingSslSocketFactory(); + public static SocketFactory getDefault() { + mutex.lock(); + try { + return new TrustingSslSocketFactory(); + } finally { + mutex.unlock(); + } } public TrustingSslSocketFactory() { diff --git a/server/impl/src/main/java/com/walmartlabs/concord/server/task/TaskScheduler.java b/server/impl/src/main/java/com/walmartlabs/concord/server/task/TaskScheduler.java index 2f913aa61b..93a8c67d01 100644 --- a/server/impl/src/main/java/com/walmartlabs/concord/server/task/TaskScheduler.java +++ b/server/impl/src/main/java/com/walmartlabs/concord/server/task/TaskScheduler.java @@ -28,10 +28,16 @@ import javax.inject.Inject; import java.time.OffsetDateTime; -import java.util.*; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; import static com.walmartlabs.concord.db.PgUtils.interval; import static java.util.stream.Collectors.toMap; @@ -50,7 +56,9 @@ public class TaskScheduler extends PeriodicTask { private final ExecutorService executor; private final SchedulerDao dao; private final Map tasks; - private final Set runningTasks = Collections.synchronizedSet(new HashSet<>()); + private final Set runningTasks = new HashSet<>(); + + private final Lock mutex = new ReentrantLock(); private long lastUpdateDate; private long lasStalledCheckDate; @@ -69,7 +77,7 @@ public TaskScheduler(Set tasks, SchedulerDao dao) { @Override protected boolean performTask() { if (lastUpdateDate + RUNNING_UPDATE_INTERVAL <= System.currentTimeMillis()) { - updateRunningTasks(); + withRunningTasks(); lastUpdateDate = System.currentTimeMillis(); } @@ -109,9 +117,8 @@ private void startTask(String id) { } executor.submit(() -> { + withRunningTasks(tasks -> tasks.add(id)); try { - runningTasks.add(id); - task.performTask(); dao.success(id); @@ -122,17 +129,13 @@ private void startTask(String id) { dao.error(id, e); } finally { - runningTasks.remove(id); + withRunningTasks(tasks -> tasks.remove(id)); } }); } - private void updateRunningTasks() { - Set forUpdate; - synchronized (runningTasks) { - forUpdate = new HashSet<>(runningTasks); - } - + private void withRunningTasks() { + Set forUpdate = copyRunningTasks(); if (forUpdate.isEmpty()) { return; } @@ -160,4 +163,21 @@ private void failStalled() { } } + private void withRunningTasks(Consumer> f) { + mutex.lock(); + try { + f.accept(runningTasks); + } finally { + mutex.unlock(); + } + } + + private Set copyRunningTasks() { + mutex.lock(); + try { + return Set.copyOf(runningTasks); + } finally { + mutex.unlock(); + } + } }