Skip to content

Commit

Permalink
BI-489 - ConcurrentModificationException during concurrent artifact d…
Browse files Browse the repository at this point in the history
…ownload (jfrog#237)
  • Loading branch information
yahavi authored Jul 11, 2019
1 parent 5eed73d commit a08fb78
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
/**
* @author Noam Y. Tenne
*/
public class ArtifactoryHttpClient {
public class ArtifactoryHttpClient implements AutoCloseable {

public static final ArtifactoryVersion UNKNOWN_PROPERTIES_TOLERANT_ARTIFACTORY_VERSION =
new ArtifactoryVersion("2.2.3");
Expand Down Expand Up @@ -129,6 +129,7 @@ public int getConnectionRetries() {
/**
* Release all connection and cleanup resources.
*/
@Override
public void close() {
if (deployClient != null) {
deployClient.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.impl.auth.BasicScheme;
import org.apache.http.impl.client.*;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.protocol.HttpContext;
import org.jfrog.build.api.util.Log;

Expand All @@ -47,36 +48,38 @@
*
* @author Yossi Shaul
*/
public class PreemptiveHttpClient {
public class PreemptiveHttpClient implements AutoCloseable {

private final static String CLIENT_VERSION;
private final boolean requestSentRetryEnabled = true;
private CloseableHttpClient httpClient;
private HttpClientContext localContext = HttpClientContext.create();
private static final boolean REQUEST_SENT_RETRY_ENABLED = true;
public static final int CONNECTION_POOL_SIZE = 10;
private static final String CLIENT_VERSION;

private PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager();
private BasicCredentialsProvider basicCredentialsProvider = new BasicCredentialsProvider();
private AuthCache authCache = new BasicAuthCache();
private CloseableHttpClient httpClient;
private int connectionRetries;
private Log log;

static {
// initialize client version
Properties properties = new Properties();
InputStream is = PreemptiveHttpClient.class.getResourceAsStream("/bi.client.properties");
if (is != null) {
try {
properties.load(is);
is.close();
} catch (IOException e) {
// ignore, use the default value
}
try (InputStream is = PreemptiveHttpClient.class.getResourceAsStream("/bi.client.properties")) {
properties.load(is);
} catch (IOException e) {
// ignore, use the default value
}
CLIENT_VERSION = properties.getProperty("client.version", "unknown");
}

@SuppressWarnings("WeakerAccess")
public PreemptiveHttpClient(String userName, String password, int timeout, ProxyConfiguration proxyConfiguration, int connectionRetries) {
HttpClientBuilder httpClientBuilder = createHttpClientBuilder(userName, password, timeout, connectionRetries);
if (proxyConfiguration != null) {
setProxyConfiguration(httpClientBuilder, proxyConfiguration);
}
connectionManager.setMaxTotal(CONNECTION_POOL_SIZE);
connectionManager.setDefaultMaxPerRoute(CONNECTION_POOL_SIZE);
httpClient = httpClientBuilder.build();
}

Expand All @@ -87,22 +90,20 @@ private void setProxyConfiguration(HttpClientBuilder httpClientBuilder, ProxyCon
if (proxyConfiguration.username != null) {
basicCredentialsProvider.setCredentials(new AuthScope(proxyConfiguration.host, proxyConfiguration.port),
new UsernamePasswordCredentials(proxyConfiguration.username, proxyConfiguration.password));
localContext.setCredentialsProvider(basicCredentialsProvider);
// Create AuthCache instance
AuthCache authCache = new BasicAuthCache();
authCache = new BasicAuthCache();
// Generate BASIC scheme object and add it to the local auth cache
BasicScheme basicAuth = new BasicScheme();
authCache.put(proxy, basicAuth);
localContext.setAuthCache(authCache);
authCache.put(proxy, new BasicScheme());
}
}

public HttpResponse execute(HttpUriRequest request) throws IOException {
if (localContext != null) {
return httpClient.execute(request, localContext);
} else {
return httpClient.execute(request);
HttpClientContext clientContext = HttpClientContext.create();
clientContext.setCredentialsProvider(basicCredentialsProvider);
if (authCache != null) {
clientContext.setAuthCache(authCache);
}
return httpClient.execute(request, clientContext);
}

private HttpClientBuilder createHttpClientBuilder(String userName, String password, int timeout, int connectionRetries) {
Expand All @@ -117,6 +118,7 @@ private HttpClientBuilder createHttpClientBuilder(String userName, String passwo

HttpClientBuilder builder = HttpClientBuilder
.create()
.setConnectionManager(connectionManager)
.setDefaultRequestConfig(requestConfig);

if (StringUtils.isEmpty(userName)) {
Expand All @@ -126,7 +128,6 @@ private HttpClientBuilder createHttpClientBuilder(String userName, String passwo

basicCredentialsProvider.setCredentials(new AuthScope(AuthScope.ANY_HOST, AuthScope.ANY_PORT),
new UsernamePasswordCredentials(userName, password));
localContext.setCredentialsProvider(basicCredentialsProvider);

// Add as the first request interceptor
builder.addInterceptorFirst(new PreemptiveAuth());
Expand All @@ -142,12 +143,14 @@ private HttpClientBuilder createHttpClientBuilder(String userName, String passwo
return builder;
}

@Override
public void close() {
try {
httpClient.close();
} catch (IOException e) {
// Do nothing
}
connectionManager.close();
}

public void setLog(Log log) {
Expand All @@ -160,13 +163,13 @@ public void setLog(Log log) {
* @return set of Exceptions that will not be retired
*/
private Set<Class<? extends IOException>> getNonRetriableClasses() {
Set<Class<? extends IOException>> classSet = new HashSet<Class<? extends IOException>>();
Set<Class<? extends IOException>> classSet = new HashSet<>();
classSet.add(SSLException.class);
return classSet;
}

static class PreemptiveAuth implements HttpRequestInterceptor {
public void process(final HttpRequest request, final HttpContext context) throws HttpException, IOException {
public void process(final HttpRequest request, final HttpContext context) throws HttpException {
HttpClientContext finalContext = (HttpClientContext) context;
AuthState authState = finalContext.getTargetAuthState();
// If no auth scheme available yet, try to initialize it preemptively
Expand Down Expand Up @@ -220,7 +223,7 @@ public long getRetryInterval() {
private class PreemptiveRetryHandler extends DefaultHttpRequestRetryHandler {

PreemptiveRetryHandler(int connectionRetries) {
super(connectionRetries, requestSentRetryEnabled, getNonRetriableClasses());
super(connectionRetries, REQUEST_SENT_RETRY_ENABLED, getNonRetriableClasses());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

import static org.jfrog.build.client.PreemptiveHttpClient.CONNECTION_POOL_SIZE;

/**
* @author Yahav Itzhak
*/
Expand Down Expand Up @@ -228,20 +230,17 @@ private void populateDependenciesMap(Map<String, Dependency> dependencies, NpmSc
// Set of packages that could not be found in Artifactory
Set<NpmPackageInfo> badPackages = Collections.synchronizedSet(new HashSet<>());
DefaultMutableTreeNode rootNode = NpmDependencyTree.createDependenciesTree(scope, npmDependenciesTree);
try (ArtifactoryDependenciesClient client1 = dependenciesClientBuilder.build();
ArtifactoryDependenciesClient client2 = dependenciesClientBuilder.build();
ArtifactoryDependenciesClient client3 = dependenciesClientBuilder.build()
) {
try (ArtifactoryDependenciesClient client = dependenciesClientBuilder.build()) {
// Create producer Runnable
ProducerRunnableBase[] producerRunnable = new ProducerRunnableBase[]{new NpmExtractorProducer(rootNode)};
// Create consumer Runnables
ConsumerRunnableBase[] consumerRunnables = new ConsumerRunnableBase[]{
new NpmExtractorConsumer(client1, dependencies, badPackages),
new NpmExtractorConsumer(client2, dependencies, badPackages),
new NpmExtractorConsumer(client3, dependencies, badPackages)
new NpmExtractorConsumer(client, dependencies, badPackages),
new NpmExtractorConsumer(client, dependencies, badPackages),
new NpmExtractorConsumer(client, dependencies, badPackages)
};
// Create the deployment executor
ProducerConsumerExecutor deploymentExecutor = new ProducerConsumerExecutor(logger, producerRunnable, consumerRunnables, 10);
ProducerConsumerExecutor deploymentExecutor = new ProducerConsumerExecutor(logger, producerRunnable, consumerRunnables, CONNECTION_POOL_SIZE);
deploymentExecutor.start();
if (!badPackages.isEmpty()) {
logger.info((Arrays.toString(badPackages.toArray())));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import com.google.common.collect.Multimap;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
import org.jfrog.build.api.Artifact;
import org.jfrog.build.api.Dependency;
Expand All @@ -32,6 +31,8 @@
import java.util.Map;
import java.util.Set;

import static org.jfrog.build.client.PreemptiveHttpClient.CONNECTION_POOL_SIZE;

/**
* Created by diman on 24/08/2016.
*/
Expand Down Expand Up @@ -66,7 +67,6 @@ public List<Artifact> uploadArtifactsBySpec(String uploadSpec, File workspace,
/**
* Upload artifacts according to a given spec, return a list describing the deployed items.
*
*
* @param uploadSpec The required spec represented as String
* @param workspace File object that represents the workspace
* @param buildProperties Upload properties
Expand Down Expand Up @@ -99,18 +99,16 @@ public List<Artifact> uploadArtifactsBySpec(String uploadSpec, int numberOfThrea
ArtifactoryBuildInfoClientBuilder clientBuilder) throws Exception {
Spec spec = this.getSpecFromString(uploadSpec, new UploadSpecValidator());

try (BuildInfoClientsArray clients = new BuildInfoClientsArray(numberOfThreads, clientBuilder)) {
// Build the buildInfoClient's
clients.buildBuildInfoClients();
try (ArtifactoryBuildInfoClient client = clientBuilder.build()) {
// Create producer Runnable
ProducerRunnableBase[] producerRunnable = new ProducerRunnableBase[]{new SpecDeploymentProducer(spec, workspace, buildProperties)};
// Create consumer Runnables
ConsumerRunnableBase[] consumerRunnables = new ConsumerRunnableBase[numberOfThreads];
for (int i = 0; i < clients.buildInfoClients.length; i++) {
consumerRunnables[i] = new SpecDeploymentConsumer(clients.buildInfoClients[i]);
for (int i = 0; i < numberOfThreads; i++) {
consumerRunnables[i] = new SpecDeploymentConsumer(client);
}
// Create the deployment executor
ProducerConsumerExecutor deploymentExecutor = new ProducerConsumerExecutor(log, producerRunnable, consumerRunnables, 10);
ProducerConsumerExecutor deploymentExecutor = new ProducerConsumerExecutor(log, producerRunnable, consumerRunnables, CONNECTION_POOL_SIZE);

deploymentExecutor.start();
Set<DeployDetails> deployedArtifacts = ((SpecDeploymentProducer) producerRunnable[0]).getDeployedArtifacts();
Expand All @@ -131,8 +129,8 @@ public static <K, V> Multimap<K, V> createMultiMap(Map<K, V> input) {
* The artifacts will be downloaded using the provided client.
* In case of relative path the artifacts will be downloaded to the targetDirectory.
*
* @param spec the spec to use for download.
* @param client the client to use for download.
* @param spec the spec to use for download.
* @param client the client to use for download.
* @param targetDirectory the target directory in case of relative path in the spec
* @return A list of the downloaded dependencies.
* @throws IOException in case of IOException
Expand Down Expand Up @@ -188,7 +186,7 @@ private void pathToUnixFormat(Spec spec) {
fileSpec.setPattern(fileSpec.getPattern().replaceAll(separator, "/"));
}
if (fileSpec.getExcludePatterns() != null) {
for (int i = 0 ; i < fileSpec.getExcludePatterns().length ; i ++) {
for (int i = 0; i < fileSpec.getExcludePatterns().length; i++) {
if (StringUtils.isNotBlank(fileSpec.getExcludePattern(i))) {
fileSpec.setExcludePattern(fileSpec.getExcludePattern(i).replaceAll(separator, "/"), i);
}
Expand All @@ -197,12 +195,9 @@ private void pathToUnixFormat(Spec spec) {
}
}

public static String getExcludePatternsLogStr(String[] excludePatterns) {
return !ArrayUtils.isEmpty(excludePatterns) ? " with exclude patterns: " + Arrays.toString(excludePatterns) : "";
}

/**
* Builds a map representing Spec's properties
*
* @param props Spec's properties
* @return created properties map
*/
Expand Down Expand Up @@ -233,33 +228,10 @@ private List<Artifact> convertDeployDetailsToArtifacts(Set<DeployDetails> detail
.sha1(detail.getSha1())
.type(ext)
.localPath(detail.getFile().getAbsolutePath())
.remotePath(detail.getTargetRepository() + "/" + detail.getArtifactPath())
.remotePath(detail.getTargetRepository() + "/" + detail.getArtifactPath())
.build();
result.add(artifactBuilder.build());
}
return result;
}

private class BuildInfoClientsArray implements AutoCloseable {
private ArtifactoryBuildInfoClientBuilder clientBuilder;
private ArtifactoryBuildInfoClient[] buildInfoClients;

private BuildInfoClientsArray(int numOfThreads, ArtifactoryBuildInfoClientBuilder clientBuilder) {
this.clientBuilder = clientBuilder;
this.buildInfoClients = new ArtifactoryBuildInfoClient[numOfThreads];
}

private void buildBuildInfoClients() {
for (int i = 0; i < buildInfoClients.length; i++) {
buildInfoClients[i] = clientBuilder.build();
}
}

@Override
public void close() {
for (ArtifactoryBuildInfoClient client : buildInfoClients) {
client.close();
}
}
}
}

0 comments on commit a08fb78

Please sign in to comment.